提交 e6a08343 编写于 作者: S Shengliang Guan

minor changes

上级 663ca79a
...@@ -32,10 +32,10 @@ static int32_t dndReadDnodes(SDnode *pDnode); ...@@ -32,10 +32,10 @@ static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode); static int32_t dndWriteDnodes(SDnode *pDnode);
static void *dnodeThreadRoutine(void *param); static void *dnodeThreadRoutine(void *param);
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
int32_t dndGetDnodeId(SDnode *pDnode) { int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
...@@ -80,13 +80,13 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { ...@@ -80,13 +80,13 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0}; SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet); dndGetMnodeEpSet(pDnode, &epSet);
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]); dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) { if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
...@@ -96,7 +96,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -96,7 +96,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
epSet.port[i] = htons(epSet.port[i]); epSet.port[i] = htons(epSet.port[i]);
} }
rpcSendRedirectRsp(pMsg->handle, &epSet); rpcSendRedirectRsp(pReq->handle, &epSet);
} }
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
...@@ -391,7 +391,7 @@ void dndSendStatusReq(SDnode *pDnode) { ...@@ -391,7 +391,7 @@ void dndSendStatusReq(SDnode *pDnode) {
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status msg to mnode", pDnode); dTrace("pDnode:%p, send status req to mnode", pDnode);
dndSendReqToMnode(pDnode, &rpcMsg); dndSendReqToMnode(pDnode, &rpcMsg);
} }
...@@ -427,12 +427,12 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { ...@@ -427,12 +427,12 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pRsp->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
pMgmt->dropped = 1; pMgmt->dropped = 1;
dndWriteDnodes(pDnode); dndWriteDnodes(pDnode);
...@@ -440,16 +440,16 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -440,16 +440,16 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
return; return;
} }
SStatusRsp *pRsp = pMsg->pCont; if (pRsp->pCont != NULL && pRsp->contLen != 0) {
if (pMsg->pCont != NULL && pMsg->contLen != 0) { SStatusRsp *pStatus = pRsp->pCont;
pMgmt->dver = htobe64(pRsp->dver); pMgmt->dver = htobe64(pStatus->dver);
SDnodeCfg *pCfg = &pRsp->dnodeCfg; SDnodeCfg *pCfg = &pStatus->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg); dndUpdateDnodeCfg(pDnode, pCfg);
SDnodeEps *pDnodeEps = &pRsp->dnodeEps; SDnodeEps *pDnodeEps = &pStatus->dnodeEps;
pDnodeEps->num = htonl(pDnodeEps->num); pDnodeEps->num = htonl(pDnodeEps->num);
for (int32_t i = 0; i < pDnodeEps->num; ++i) { for (int32_t i = 0; i < pDnodeEps->num; ++i) {
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
...@@ -461,26 +461,27 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -461,26 +461,27 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) {
dError("grant rsp is received, but not supported yet");
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { }
dError("config msg is received, but not supported yet");
SDCfgDnodeReq *pCfg = pMsg->pCont;
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dError("config req is received, but not supported yet");
SDCfgDnodeReq *pCfg = pReq->pCont;
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup msg is received"); dDebug("startup req is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
dndGetStartup(pDnode, pStartup); dndGetStartup(pDnode, pStartup);
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)}; SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
...@@ -707,7 +708,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -707,7 +708,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; code = -1;
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册