dndMsg.c 3.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
  }
  dndReleaseWrapper(pWrapper);
}

static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
  }

  return msgFp;
}

static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
37 38 39 40 41 42 43 44
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
shm  
Shengliang Guan 已提交
45
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
46 47 48 49 50 51

  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
S
shm  
Shengliang Guan 已提交
52
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
S
shm  
Shengliang Guan 已提交
53 54 55 56
  }

  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
57
  NodeMsgFp msgFp = NULL;
S
shm  
Shengliang Guan 已提交
58

S
shm  
Shengliang Guan 已提交
59 60 61 62
  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
shm  
Shengliang Guan 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80

  dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
  if (pWrapper->procType == PROC_SINGLE) {
    code = (*msgFp)(pWrapper->pMgmt, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
    code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
  } else {
  }

_OVER:
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
      dTrace("msg:%p, is freed", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
shm  
Shengliang Guan 已提交
81
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

S
shm  
Shengliang Guan 已提交
90
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
91 92
}

S
shm  
Shengliang Guan 已提交
93
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
94
  switch (pMsg->rpcMsg.msgType) {
S
shm  
Shengliang Guan 已提交
95 96
    case TDMT_DND_CREATE_MNODE:
      return dndOpenNode(pDnode, MNODE);
S
shm  
Shengliang Guan 已提交
97
    case TDMT_DND_DROP_MNODE:
S
shm  
Shengliang Guan 已提交
98 99 100
      return dndCloseNode(pDnode, MNODE);
    case TDMT_DND_CREATE_QNODE:
      return dndOpenNode(pDnode, QNODE);
S
shm  
Shengliang Guan 已提交
101
    case TDMT_DND_DROP_QNODE:
S
shm  
Shengliang Guan 已提交
102 103 104
      return dndCloseNode(pDnode, QNODE);
    case TDMT_DND_CREATE_SNODE:
      return dndOpenNode(pDnode, SNODE);
S
shm  
Shengliang Guan 已提交
105
    case TDMT_DND_DROP_SNODE:
S
shm  
Shengliang Guan 已提交
106 107 108
      return dndCloseNode(pDnode, MNODE);
    case TDMT_DND_CREATE_BNODE:
      return dndOpenNode(pDnode, BNODE);
S
shm  
Shengliang Guan 已提交
109
    case TDMT_DND_DROP_BNODE:
S
shm  
Shengliang Guan 已提交
110 111
      return dndCloseNode(pDnode, BNODE);

S
shm  
Shengliang Guan 已提交
112 113 114 115 116
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      return -1;
  }
}