提交 36fe62fb 编写于 作者: S Shengliang Guan

refactor: make more object global

上级 ddf55f25
...@@ -22,9 +22,10 @@ ...@@ -22,9 +22,10 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SRpcMsg SRpcMsg; typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet; typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SRpcHandleInfo SRpcHandleInfo;
typedef enum { typedef enum {
QUERY_QUEUE, QUERY_QUEUE,
...@@ -37,19 +38,17 @@ typedef enum { ...@@ -37,19 +38,17 @@ typedef enum {
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;
typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(const SRpcMsg* pMsg);
typedef void (*SendRspFp)(const SRpcMsg* pRsp); typedef void (*SendRedirectRspFp)(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; void* mgmt;
void* pMgmt;
void* clientRpc; void* clientRpc;
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
...@@ -62,14 +61,13 @@ typedef struct { ...@@ -62,14 +61,13 @@ typedef struct {
} SMsgCb; } SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
void tmsgSendRsp(SRpcMsg* pRsp); void tmsgSendRsp(const SRpcMsg* pMsg);
void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp); void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet);
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReleaseHandle(void* handle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -36,7 +36,7 @@ typedef struct { ...@@ -36,7 +36,7 @@ typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct { typedef struct SRpcHandleInfo {
// rpc info // rpc info
void *handle; // rpc handle returned to app void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server int64_t refId; // refid, used by server
......
...@@ -22,51 +22,38 @@ static SMsgCb tsDefaultMsgCb; ...@@ -22,51 +22,38 @@ static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
// cannot be empty, but not checked for faster detect
PutToQueueFp fp = pMsgCb->queueFps[qtype]; PutToQueueFp fp = pMsgCb->queueFps[qtype];
return (*fp)(pMsgCb->pMgmt, pReq); return (*fp)(pMsgCb->mgmt, pReq);
} }
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
// cannot be empty, but not checked for faster detect
GetQueueSizeFp fp = pMsgCb->qsizeFp; GetQueueSizeFp fp = pMsgCb->qsizeFp;
return (*fp)(pMsgCb->pMgmt, vgId, qtype); return (*fp)(pMsgCb->mgmt, vgId, qtype);
} }
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pReq) {
// cannot be empty, but not checked for faster detect SendReqFp fp = tsDefaultMsgCb.sendReqFp;
SendReqFp fp = pMsgCb->sendReqFp; return (*fp)(epSet, pReq);
return (*fp)(pMsgCb->pWrapper, epSet, pReq);
} }
void tmsgSendRsp(SRpcMsg* pMsg) { void tmsgSendRsp(const SRpcMsg* pMsg) {
// cannot be empty, but not checked for faster detect
SendRspFp fp = tsDefaultMsgCb.sendRspFp; SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pMsg); return (*fp)(pMsg);
} }
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) { void tmsgSendRedirectRsp(const SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
// cannot be empty, but not checked for faster detect
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
(*fp)(pRsp, pNewEpSet); (*fp)(pMsg, pNewEpSet);
} }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp; RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp;
if (fp != NULL) { (*fp)(pMsg);
(*fp)(pMsgCb->pWrapper, pMsg);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
} }
void tmsgReleaseHandle(void* handle, int8_t type) { void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp; ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
if (fp != NULL) { (*fp)(pHandle, type);
(*fp)(tsDefaultMsgCb.pWrapper, handle, type);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
} }
void tmsgReportStartup(const char* name, const char* desc) { void tmsgReportStartup(const char* name, const char* desc) {
......
...@@ -43,7 +43,7 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -43,7 +43,7 @@ int32_t bmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
SBnodeOpt option = {0}; SBnodeOpt option = {0};
bmInitOption(pMgmt, &option); bmInitOption(pMgmt, &option);
......
...@@ -136,7 +136,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -136,7 +136,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { if (mmReadFile(pMgmt, &deployed) != 0) {
......
...@@ -46,7 +46,7 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -46,7 +46,7 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
SQnodeOpt option = {0}; SQnodeOpt option = {0};
qmInitOption(pMgmt, &option); qmInitOption(pMgmt, &option);
......
...@@ -44,7 +44,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -44,7 +44,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
SSnodeOpt option = {0}; SSnodeOpt option = {0};
smInitOption(pMgmt, &option); smInitOption(pMgmt, &option);
......
...@@ -258,7 +258,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -258,7 +258,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue; pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue;
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.pMgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
SDiskCfg dCfg = {0}; SDiskCfg dCfg = {0};
......
...@@ -65,7 +65,6 @@ typedef struct { ...@@ -65,7 +65,6 @@ typedef struct {
} SProc; } SProc;
typedef struct SMgmtWrapper { typedef struct SMgmtWrapper {
struct SDnode *pDnode;
SMgmtFunc func; SMgmtFunc func;
void *pMgmt; void *pMgmt;
const char *name; const char *name;
...@@ -124,8 +123,12 @@ typedef struct SDnode { ...@@ -124,8 +123,12 @@ typedef struct SDnode {
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
} SDnode; } SDnode;
// dmEmv.c // dmEnv.c
void dmReportStartup(const char *pName, const char *pDesc); SDnode *dmInstance();
bool dmNotRunning();
void dmReportStartup(const char *pName, const char *pDesc);
void *dmGetClientRpc();
void dmGetMnodeEpSetGlobal(SEpSet *pEpSet);
// dmMgmt.c // dmMgmt.c
int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype); int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static SDnode global = {0}; SDnode global = {0};
static int32_t dmCheckRepeatInit(SDnode *pDnode) { static int32_t dmCheckRepeatInit(SDnode *pDnode) {
if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
...@@ -166,10 +166,12 @@ static bool dmIsNodeRequired(EDndNodeType ntype) { ...@@ -166,10 +166,12 @@ static bool dmIsNodeRequired(EDndNodeType ntype) {
} }
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SDnode *pDnode = dmInstance();
SMgmtInputOpt opt = { SMgmtInputOpt opt = {
.path = pWrapper->path, .path = pWrapper->path,
.name = pWrapper->name, .name = pWrapper->name,
.pData = &pWrapper->pDnode->data, .pData = &pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq, .processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq, .processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired, .isNodeRequiredFp = dmIsNodeRequired,
...@@ -185,3 +187,11 @@ void dmReportStartup(const char *pName, const char *pDesc) { ...@@ -185,3 +187,11 @@ void dmReportStartup(const char *pName, const char *pDesc) {
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
dDebug("step:%s, %s", pStartup->name, pStartup->desc); dDebug("step:%s, %s", pStartup->name, pStartup->desc);
} }
SDnode *dmInstance() { return &global; }
bool dmNotRunning() { return global.status != DND_STAT_RUNNING; }
void *dmGetClientRpc() { return global.trans.clientRpc; }
void dmGetMnodeEpSetGlobal(SEpSet *pEpSet) { dmGetMnodeEpSet(&global.data, pEpSet); }
\ No newline at end of file
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
static bool dmRequireNode(SMgmtWrapper *pWrapper) { static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
bool required = false; bool required = false;
...@@ -25,7 +25,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { ...@@ -25,7 +25,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, does not require startup", pWrapper->name); dDebug("node:%s, does not require startup", pWrapper->name);
} }
if (pWrapper->ntype == DNODE && pWrapper->pDnode->rtype != DNODE && pWrapper->pDnode->rtype != NODE_END) { if (pWrapper->ntype == DNODE && pDnode->rtype != DNODE && pDnode->rtype != NODE_END) {
required = false; required = false;
dDebug("node:%s, does not require startup in child process", pWrapper->name); dDebug("node:%s, does not require startup in child process", pWrapper->name);
} }
...@@ -150,7 +150,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { ...@@ -150,7 +150,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
goto _OVER; goto _OVER;
} }
pWrapper->required = dmRequireNode(pWrapper); pWrapper->required = dmRequireNode(pDnode, pWrapper);
if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) { if (ntype != DNODE && dmReadShmFile(pWrapper->path, pWrapper->name, pDnode->rtype, &pWrapper->proc.shm) != 0) {
dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr()); dError("node:%s, failed to read shm file since %s", pWrapper->name, terrstr());
......
...@@ -64,6 +64,8 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { ...@@ -64,6 +64,8 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
} }
int32_t dmOpenNode(SMgmtWrapper *pWrapper) { int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
SDnode *pDnode = dmInstance();
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
...@@ -101,7 +103,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { ...@@ -101,7 +103,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr()); dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
if (pWrapper->pDnode->rtype == NODE_END) { if (pDnode->rtype == NODE_END) {
dInfo("node:%s, should be started manually in child process", pWrapper->name); dInfo("node:%s, should be started manually in child process", pWrapper->name);
} else { } else {
if (dmNewProc(pWrapper, pWrapper->ntype) != 0) { if (dmNewProc(pWrapper, pWrapper->ntype) != 0) {
......
...@@ -194,9 +194,9 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -194,9 +194,9 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet); dmGetMnodeEpSetGlobal(&epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
...@@ -221,15 +221,8 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { ...@@ -221,15 +221,8 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_NODE_REDIRECT) { SDnode *pDnode = dmInstance();
dmSendRpcRedirectRsp(pDnode, pRsp);
} else {
rpcSendResponse(pRsp);
}
}
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
if (pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
pRsp->code = TSDB_CODE_NODE_OFFLINE; pRsp->code = TSDB_CODE_NODE_OFFLINE;
rpcFreeCont(pReq->pCont); rpcFreeCont(pReq->pCont);
...@@ -239,18 +232,18 @@ static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRp ...@@ -239,18 +232,18 @@ static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRp
} }
} }
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) { if (pDnode->status != DND_STAT_RUNNING) {
rpcFreeCont(pReq->pCont); rpcFreeCont(pReq->pCont);
pReq->pCont = NULL; pReq->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE; terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle);
return -1; return -1;
} else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
return 0;
} }
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
return 0;
} }
static inline void dmSendRsp(const SRpcMsg *pMsg) { static inline void dmSendRsp(const SRpcMsg *pMsg) {
...@@ -258,7 +251,11 @@ static inline void dmSendRsp(const SRpcMsg *pMsg) { ...@@ -258,7 +251,11 @@ static inline void dmSendRsp(const SRpcMsg *pMsg) {
if (InChildProc(pWrapper->proc.ptype)) { if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP); dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
} else { } else {
dmSendRpcRsp(pWrapper->pDnode, pMsg); if (pMsg->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRpcRedirectRsp(pMsg);
} else {
rpcSendResponse(pMsg);
}
} }
} }
...@@ -280,7 +277,9 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe ...@@ -280,7 +277,9 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
} }
} }
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper->proc.ptype)) { if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST); dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
} else { } else {
...@@ -288,12 +287,14 @@ static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg ...@@ -288,12 +287,14 @@ static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg
} }
} }
static inline void dmReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
SMgmtWrapper *pWrapper = pHandle->wrapper;
if (InChildProc(pWrapper->proc.ptype)) { if (InChildProc(pWrapper->proc.ptype)) {
SRpcMsg msg = {.info.handle = handle, .code = type}; SRpcMsg msg = {.info = *pHandle, .code = type};
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE); dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
} else { } else {
rpcReleaseHandle(handle, type); rpcReleaseHandle(pHandle->handle, type);
} }
} }
...@@ -385,7 +386,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s ...@@ -385,7 +386,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SEpSet epSet = {0}; SEpSet epSet = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dmGetMnodeEpSet(&pDnode->data, &epSet); dmGetMnodeEpSet(&pDnode->data, &epSet);
dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp); dmSendRecv(&epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
terrno = rpcRsp.code; terrno = rpcRsp.code;
...@@ -441,15 +442,15 @@ void dmCleanupServer(SDnode *pDnode) { ...@@ -441,15 +442,15 @@ void dmCleanupServer(SDnode *pDnode) {
} }
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = { SDnode *pDnode = dmInstance();
.pWrapper = pWrapper, SMsgCb msgCb = {
.clientRpc = pWrapper->pDnode->trans.clientRpc, .clientRpc = dmInstance()->trans.clientRpc,
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,
.sendRedirectRspFp = dmSendRedirectRsp, .sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle, .releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartup, .reportStartupFp = dmReportStartup,
}; };
return msgCb; return msgCb;
} }
...@@ -625,7 +625,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -625,7 +625,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config);
tmsgSendReq(&pMnode->msgCb, &epSet, &rpcMsg); tmsgSendReq(&epSet, &rpcMsg);
return 0; return 0;
} }
......
...@@ -985,7 +985,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -985,7 +985,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
} }
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (tmsgSendReq(&pMnode->msgCb, &pAction->epSet, &rpcMsg) == 0) { if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent", pTrans->id, action); mDebug("trans:%d, action:%d is sent", pTrans->id, action);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
......
...@@ -47,7 +47,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -47,7 +47,7 @@ class MndTestTrans2 : public ::testing::Test {
static void InitMnode() { static void InitMnode() {
static SMsgCb msgCb = {0}; static SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup; msgCb.reportStartupFp = reportStartup;
msgCb.pWrapper = (SMgmtWrapper *)(&msgCb); // hack msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefaultMsgCb(&msgCb); tmsgSetDefaultMsgCb(&msgCb);
SMnodeOpt opt = {0}; SMnodeOpt opt = {0};
......
...@@ -73,7 +73,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -73,7 +73,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
SMsgCb *pMsgCb = rpcHandle; SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
tmsgSendReq(rpcHandle, pEpSet, pMsg); tmsgSendReq(pEpSet, pMsg);
} else { } else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
} }
......
...@@ -412,7 +412,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -412,7 +412,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
} }
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.handle = NULL;
ctx->ctrlConnInfo.refId = -1; ctx->ctrlConnInfo.refId = -1;
...@@ -1278,7 +1278,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re ...@@ -1278,7 +1278,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
sch->hbConnInfo.handle = NULL; sch->hbConnInfo.handle = NULL;
sch->hbConnInfo.ahandle = NULL; sch->hbConnInfo.ahandle = NULL;
...@@ -1310,7 +1310,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1310,7 +1310,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
...@@ -1330,7 +1330,7 @@ _return: ...@@ -1330,7 +1330,7 @@ _return:
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) { if (code) {
tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(&qwMsg->connInfo, TAOS_CONN_SERVER);
} }
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
...@@ -1498,7 +1498,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { ...@@ -1498,7 +1498,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
......
...@@ -294,7 +294,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { ...@@ -294,7 +294,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
.info = *pConn, .info = *pConn,
}; };
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); tmsgRegisterBrokenLinkArg(&pMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -328,7 +328,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * ...@@ -328,7 +328,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
.info = *pConn, .info = *pConn,
}; };
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); tmsgRegisterBrokenLinkArg(&pMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -959,7 +959,7 @@ TEST(seqTest, normalCase) { ...@@ -959,7 +959,7 @@ TEST(seqTest, normalCase) {
stubSetGetDataBlock(); stubSetGetDataBlock();
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1001,7 +1001,7 @@ TEST(seqTest, cancelFirst) { ...@@ -1001,7 +1001,7 @@ TEST(seqTest, cancelFirst) {
stubSetRpcSendResponse(); stubSetRpcSendResponse();
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1050,7 +1050,7 @@ TEST(seqTest, randCase) { ...@@ -1050,7 +1050,7 @@ TEST(seqTest, randCase) {
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1124,7 +1124,7 @@ TEST(seqTest, multithreadRand) { ...@@ -1124,7 +1124,7 @@ TEST(seqTest, multithreadRand) {
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1188,7 +1188,7 @@ TEST(rcTest, shortExecshortDelay) { ...@@ -1188,7 +1188,7 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1272,7 +1272,7 @@ TEST(rcTest, longExecshortDelay) { ...@@ -1272,7 +1272,7 @@ TEST(rcTest, longExecshortDelay) {
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1358,7 +1358,7 @@ TEST(rcTest, shortExeclongDelay) { ...@@ -1358,7 +1358,7 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -1442,7 +1442,7 @@ TEST(rcTest, dropTest) { ...@@ -1442,7 +1442,7 @@ TEST(rcTest, dropTest) {
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
......
...@@ -111,7 +111,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb ...@@ -111,7 +111,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); tmsgSendReq(pEpSet, &dispatchMsg);
} }
return 0; return 0;
} }
...@@ -371,7 +371,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat ...@@ -371,7 +371,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
return -1; return -1;
} }
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
...@@ -571,7 +571,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -571,7 +571,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
return -1; return -1;
} }
tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册