提交 a8b890bc 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

上级 4df0c48f
......@@ -22,13 +22,13 @@
extern "C" {
#endif
typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType;
typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont);
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype);
EProcFuncType ftype);
typedef struct {
ProcConsumeFp childConsumeFp;
......@@ -53,11 +53,11 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype);
void *handle, EProcFuncType ftype);
void taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype);
EProcFuncType ftype);
#ifdef __cplusplus
}
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dndImp.h"
#include "tconfig.h"
static struct {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_DNODE_INT_H_
#define _TD_DND_DNODE_INT_H_
#include "dndNode.h"
#ifdef __cplusplus
extern "C" {
#endif
// dmFile.c
int32_t dmReadFile(SDnodeData *pMgmt);
int32_t dmWriteFile(SDnodeData *pMgmt);
void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps);
// dmHandle.c
void dmInitMsgHandle(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeData *pMgmt);
int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c
int32_t dmStartThread(SDnodeData *pMgmt);
int32_t dmStartWorker(SDnodeData *pMgmt);
void dmStopWorker(SDnodeData *pMgmt);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_DNODE_INT_H_*/
\ No newline at end of file
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_NODE_H_
#define _TD_DND_NODE_H_
#ifndef _TD_DND_IMP_H_
#define _TD_DND_IMP_H_
#include "dndInt.h"
......@@ -30,6 +30,7 @@ int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq);
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
// mgmt
......@@ -40,19 +41,40 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper);
void vmSetMgmtFp(SMgmtWrapper *pWrapper);
void mmSetMgmtFp(SMgmtWrapper *pMgmt);
void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pMsg);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo);
void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo);
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo);
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo);
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo);
// dmFile.c
int32_t dmReadFile(SDnodeData *pMgmt);
int32_t dmWriteFile(SDnodeData *pMgmt);
void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps);
// dmHandle.c
void dmInitMsgHandle(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeData *pMgmt);
int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c
int32_t dmStartThread(SDnodeData *pMgmt);
int32_t dmStartWorker(SDnodeData *pMgmt);
void dmStopWorker(SDnodeData *pMgmt);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_NODE_H_*/
\ No newline at end of file
#endif /*_TD_DND_IMP_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) {
taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch);
}
void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
taosWLockLatch(&pMgmt->latch);
pMgmt->mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pMgmt->latch);
}
void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeData *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
if (pPort != NULL) {
*pPort = pDnodeEp->ep.port;
}
if (pFqdn != NULL) {
tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
}
if (pEp != NULL) {
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
}
}
taosRUnLockLatch(&pMgmt->latch);
}
void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pReq) {
SDnode *pDnode = pMgmt->pDnode;
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
rpcSendRedirectRsp(pReq->handle, &epSet);
}
static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData));
dInfo("dnode-mgmt start to init");
pDnode->data.dnodeId = 0;
pDnode->data.dropped = 0;
pDnode->data.clusterId = 0;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pDnode;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pDnode->data.dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
return -1;
}
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
pWrapper->pMgmt = pMgmt;
pMgmt->msgCb = dndCreateMsgcb(pWrapper);
dInfo("dnode-mgmt is initialized");
return 0;
}
static void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeData *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pMgmt->pDnode;
dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->dnodeEps);
pMgmt->dnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
taosMemoryFree(pMgmt);
pWrapper->pMgmt = NULL;
dndCleanupTrans(pDnode);
dInfo("dnode-mgmt is cleaned up");
}
static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
*required = true;
return 0;
}
void dmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.startFp = dmStart;
mgmtFp.requiredFp = dmRequire;
dmInitMsgHandle(pWrapper);
pWrapper->name = "dnode";
pWrapper->fp = mgmtFp;
}
......@@ -14,12 +14,32 @@
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dndImp.h"
static void dmPrintDnodes(SDnodeData *pMgmt);
static bool dmIsEpChanged(SDnodeData *pMgmt, int32_t dnodeId, const char *ep);
static void dmResetDnodes(SDnodeData *pMgmt, SArray *dnodeEps);
void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
SDnodeData *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch);
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
if (pPort != NULL) {
*pPort = pDnodeEp->ep.port;
}
if (pFqdn != NULL) {
tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
}
if (pEp != NULL) {
snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
}
}
taosRUnLockLatch(&pMgmt->latch);
}
int32_t dmReadFile(SDnodeData *pMgmt) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0;
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dndImp.h"
static bool dndRequireNode(SMgmtWrapper *pWrapper) {
bool required = false;
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dndImp.h"
void dmSendStatusReq(SDnodeData *pMgmt) {
SDnode *pDnode = pMgmt->pDnode;
......@@ -57,9 +57,7 @@ void dmSendStatusReq(SDnodeData *pMgmt) {
pMgmt->statusSent = 1;
dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt, &epSet);
tmsgSendReq(&pMgmt->msgCb, &epSet, &rpcMsg);
dndSendMsgToMnode(pDnode, &rpcMsg);
}
static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) {
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dndImp.h"
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
......
......@@ -14,7 +14,104 @@
*/
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dndImp.h"
static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData));
dInfo("dnode-mgmt start to init");
pDnode->data.dnodeId = 0;
pDnode->data.dropped = 0;
pDnode->data.clusterId = 0;
pMgmt->path = pWrapper->path;
pMgmt->pDnode = pDnode;
taosInitRWLatch(&pMgmt->latch);
pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMgmt->dnodeHash == NULL) {
dError("failed to init dnode hash");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dmReadFile(pMgmt) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (pDnode->data.dropped) {
dError("dnode will not start since its already dropped");
return -1;
}
if (dmStartWorker(pMgmt) != 0) {
return -1;
}
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
pWrapper->pMgmt = pMgmt;
pMgmt->msgCb = dndCreateMsgcb(pWrapper);
dInfo("dnode-mgmt is initialized");
return 0;
}
static void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeData *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pMgmt->pDnode;
dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch);
if (pMgmt->dnodeEps != NULL) {
taosArrayDestroy(pMgmt->dnodeEps);
pMgmt->dnodeEps = NULL;
}
if (pMgmt->dnodeHash != NULL) {
taosHashCleanup(pMgmt->dnodeHash);
pMgmt->dnodeHash = NULL;
}
taosWUnLockLatch(&pMgmt->latch);
taosMemoryFree(pMgmt);
pWrapper->pMgmt = NULL;
dndCleanupTrans(pDnode);
dInfo("dnode-mgmt is cleaned up");
}
static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
*required = true;
return 0;
}
void dmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
mgmtFp.startFp = dmStart;
mgmtFp.requiredFp = dmRequire;
dmInitMsgHandle(pWrapper);
pWrapper->name = "dnode";
pWrapper->fp = mgmtFp;
}
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->data.supportVnodes = pOption->numOfSupportVnodes;
......
......@@ -14,19 +14,28 @@
*/
#define _DEFAULT_SOURCE
#include "dndNode.h"
#include "dndImp.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq);
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type);
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN];
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
static void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRLockLatch(&pDnode->data.latch);
*pEpSet = pDnode->data.mnodeEpSet;
taosRUnLockLatch(&pDnode->data.latch);
}
static void dndSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
taosWLockLatch(&pDnode->data.latch);
pDnode->data.mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pDnode->data.latch);
}
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
......@@ -60,7 +69,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
uint16_t msgType = pRpc->msgType;
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
dndSetMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
......@@ -74,7 +83,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS
} else if (pWrapper->procType == DND_PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_REQ);
PROC_FUNC_REQ);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1);
......@@ -162,168 +171,10 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
}
static int32_t dndInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1;
rpcInit.parent = pDnode;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
rpcInit.secret = pass;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
return -1;
}
dDebug("dnode rpc client is initialized");
return 0;
}
static void dndCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->clientRpc) {
rpcClose(pTrans->clientRpc);
pTrans->clientRpc = NULL;
dDebug("dnode rpc client is closed");
}
}
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN];
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
}
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
char *ckey) {
int32_t code = 0;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
if (strcmp(user, INTERNAL_USER) == 0) {
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
} else {
code = -1;
}
if (code == 0) {
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 1;
*encrypt = 0;
*ckey = 0;
}
return code;
}
static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp authRsp = {0};
tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
*spi = authRsp.spi;
*encrypt = authRsp.encrypt;
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
authRsp.encrypt);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t dndInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->data.serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode;
pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) {
dError("failed to init dnode rpc server");
return -1;
}
dDebug("dnode rpc server is initialized");
return 0;
}
static void dndCleanupServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->serverRpc) {
rpcClose(pTrans->serverRpc);
pTrans->serverRpc = NULL;
dDebug("dnode rpc server is closed");
}
}
int32_t dndInitTrans(SDnode *pDnode) {
if (dndInitServer(pDnode) != 0) return -1;
if (dndInitClient(pDnode) != 0) return -1;
SMsgCb msgCb = {
.sendReqFp = dndSendReq,
.sendRspFp = dndSendRsp,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle,
};
pDnode->data.msgCb = msgCb;
return 0;
}
void dndCleanupTrans(SDnode *pDnode) {
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
}
int32_t dndInitMsgHandle(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
for (EDndNodeType n = 0; n < NODE_END; ++n) {
for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
......@@ -357,25 +208,58 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return 0;
}
static int32_t dndSendRpcReq(SDnodeTrans *pTrans, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pTrans->clientRpc == NULL) {
static inline int32_t dndSendRpcReq(SDnode *pDnode, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pDnode->trans.clientRpc == NULL) {
terrno = TSDB_CODE_NODE_OFFLINE;
return -1;
}
rpcSendRequest(pTrans->clientRpc, pEpSet, pReq, NULL);
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
return 0;
}
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
static void dndSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) {
epSet.inUse = (i + 1) % epSet.numOfEps;
}
epSet.eps[i].port = htons(epSet.eps[i].port);
}
rpcSendRedirectRsp(pReq->handle, &epSet);
}
static inline void dndSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_NODE_REDIRECT) {
dmSendRedirectRsp(pWrapper->pMgmt, pRsp);
dndSendRpcRedirectRsp(pDnode, pRsp);
} else {
rpcSendResponse(pRsp);
}
}
static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendRpcReq(pDnode, &epSet, pReq);
}
static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
}
static inline int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) {
terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
......@@ -383,7 +267,7 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
}
if (pWrapper->procType != DND_PROC_CHILD) {
return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq);
return dndSendRpcReq(pWrapper->pDnode, pEpSet, pReq);
} else {
char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
if (pHead == NULL) {
......@@ -394,39 +278,39 @@ static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
memcpy(pHead, pReq, sizeof(SRpcMsg));
memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
PROC_REQ);
PROC_FUNC_REQ);
taosMemoryFree(pHead);
return 0;
}
}
static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
static inline void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType != DND_PROC_CHILD) {
dndSendRpcRsp(pWrapper, pRsp);
dndSendRpcRsp(pWrapper->pDnode, pRsp);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
static inline void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != DND_PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg);
} else {
taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST);
taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST);
}
}
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
static inline void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
if (pWrapper->procType != DND_PROC_CHILD) {
rpcReleaseHandle(handle, type);
} else {
SRpcMsg msg = {.handle = handle, .code = type};
taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE);
taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE);
}
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) {
EProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont;
dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
......@@ -448,26 +332,26 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
}
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) {
EProcFuncType ftype) {
pMsg->pCont = pCont;
dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle,
pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle);
switch (ftype) {
case PROC_REGIST:
case PROC_FUNC_REGIST:
rpcRegisterBrokenLinkArg(pMsg);
break;
case PROC_RELEASE:
case PROC_FUNC_RELEASE:
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont);
break;
case PROC_REQ:
dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
case PROC_FUNC_REQ:
dndSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break;
case PROC_RSP:
case PROC_FUNC_RSP:
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dndSendRpcRsp(pWrapper, pMsg);
dndSendRpcRsp(pWrapper->pDnode, pMsg);
break;
default:
break;
......@@ -492,6 +376,152 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
return cfg;
}
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
\ No newline at end of file
static int32_t dndInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DND";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER;
rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1;
rpcInit.parent = pDnode;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
rpcInit.secret = pass;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");
return -1;
}
dDebug("dnode rpc client is initialized");
return 0;
}
static void dndCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->clientRpc) {
rpcClose(pTrans->clientRpc);
pTrans->clientRpc = NULL;
dDebug("dnode rpc client is closed");
}
}
static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
char *ckey) {
int32_t code = 0;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
if (strcmp(user, INTERNAL_USER) == 0) {
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
} else {
code = -1;
}
if (code == 0) {
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 1;
*encrypt = 0;
*ckey = 0;
}
return code;
}
static inline int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
char *ckey) {
if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp authRsp = {0};
tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
*spi = authRsp.spi;
*encrypt = authRsp.encrypt;
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
authRsp.encrypt);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t dndInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.localPort = pDnode->data.serverPort;
rpcInit.label = "DND";
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = (RpcCfp)dndProcessMsg;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode;
pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) {
dError("failed to init dnode rpc server");
return -1;
}
dDebug("dnode rpc server is initialized");
return 0;
}
static void dndCleanupServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->serverRpc) {
rpcClose(pTrans->serverRpc);
pTrans->serverRpc = NULL;
dDebug("dnode rpc server is closed");
}
}
int32_t dndInitTrans(SDnode *pDnode) {
if (dndInitServer(pDnode) != 0) return -1;
if (dndInitClient(pDnode) != 0) return -1;
SMsgCb msgCb = {
.sendReqFp = dndSendReq,
.sendRspFp = dndSendRsp,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle,
};
pDnode->data.msgCb = msgCb;
return 0;
}
void dndCleanupTrans(SDnode *pDnode) {
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
}
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "dndImp.h"
static void *dmThreadRoutine(void *param) {
SDnodeData *pMgmt = param;
......
......@@ -154,7 +154,7 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
}
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
const char *pBody, int32_t rawBodyLen, int64_t handle, EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
......@@ -171,7 +171,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
return -1;
}
if (handle != 0 && ftype == PROC_REQ) {
if (handle != 0 && ftype == PROC_FUNC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -232,7 +232,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
}
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen,
ProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp,
EProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp,
ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) {
tsem_wait(&pQueue->sem);
......@@ -309,7 +309,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
*ppBody = pBody;
*pHeadLen = rawHeadLen;
*pBodyLen = rawBodyLen;
*pFuncType = (ProcFuncType)ftype;
*pFuncType = (EProcFuncType)ftype;
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
......@@ -364,7 +364,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
static void taosProcThreadLoop(SProcObj *pProc) {
void *pHead, *pBody;
int16_t headLen;
ProcFuncType ftype;
EProcFuncType ftype;
int32_t bodyLen;
SProcQueue *pQueue;
ProcConsumeFp consumeFp;
......@@ -454,8 +454,8 @@ void taosProcCleanup(SProcObj *pProc) {
}
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, ProcFuncType ftype) {
if (ftype != PROC_REQ) {
void *handle, EProcFuncType ftype) {
if (ftype != PROC_FUNC_REQ) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
......@@ -482,7 +482,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
}
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype) {
EProcFuncType ftype) {
int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
......
......@@ -89,7 +89,7 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) {
taosDropShm(&shm);
}
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
......@@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_FUNC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0);
}
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0);
cfg.isChild = true;
cfg.name = "1235_p";
......@@ -147,7 +147,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
taosDropShm(&shm);
}
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
......@@ -186,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ);
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ);
}
taosProcRun(cproc);
......@@ -198,7 +198,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
taosDropShm(&shm);
}
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) {
STestMsg msg;
memcpy(&msg, pHead, headLen);
char body[2000] = {0};
......@@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) {
int32_t i = 0;
for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_FUNC_REQ), 0);
}
cfg.isChild = true;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册