dndMsg.c 5.2 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
  }
  dndReleaseWrapper(pWrapper);
}

static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
  }

  return msgFp;
}

static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
37 38 39 40 41 42 43 44
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
shm  
Shengliang Guan 已提交
45
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
46 47 48 49 50 51

  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
S
shm  
Shengliang Guan 已提交
52
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
S
shm  
Shengliang Guan 已提交
53 54 55 56
  }

  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
57
  NodeMsgFp msgFp = NULL;
S
shm  
Shengliang Guan 已提交
58

S
shm  
Shengliang Guan 已提交
59 60 61 62
  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
shm  
Shengliang Guan 已提交
63

S
shm  
Shengliang Guan 已提交
64
  dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  if (pWrapper->procType == PROC_SINGLE) {
    code = (*msgFp)(pWrapper->pMgmt, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
    code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
  } else {
  }

_OVER:
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
      dTrace("msg:%p, is freed", pMsg);
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
    dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
shm  
Shengliang Guan 已提交
81
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
82 83 84 85 86 87 88 89
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

S
shm  
Shengliang Guan 已提交
90
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
91 92
}

S
shm  
Shengliang Guan 已提交
93 94 95 96 97 98 99 100 101
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dndReleaseWrapper(pWrapper);
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
    return -1;
  }

  pWrapper = &pDnode->wrappers[ntype];
S
shm  
Shengliang Guan 已提交
102 103 104 105 106 107 108

  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  int32_t code = (*pWrapper->fp.createMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
  } else {
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
  }

  return code;
}

static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
    return -1;
  }

S
shm  
Shengliang Guan 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139
  taosWLockLatch(&pWrapper->latch);
  pWrapper->deployed = false;

  int32_t code = (*pWrapper->fp.dropMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    pWrapper->deployed = true;
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
    pWrapper->deployed = false;
    dDebug("node:%s, has been dropped", pWrapper->name);
  }

  taosWUnLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
140
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
141
  return code;
S
shm  
Shengliang Guan 已提交
142 143
}

S
shm  
Shengliang Guan 已提交
144
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
145
  switch (pMsg->rpcMsg.msgType) {
S
shm  
Shengliang Guan 已提交
146
    case TDMT_DND_CREATE_MNODE:
S
shm  
Shengliang Guan 已提交
147
      return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
148
    case TDMT_DND_DROP_MNODE:
S
shm  
Shengliang Guan 已提交
149
      return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
150
    case TDMT_DND_CREATE_QNODE:
S
shm  
Shengliang Guan 已提交
151
      return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
152
    case TDMT_DND_DROP_QNODE:
S
shm  
Shengliang Guan 已提交
153
      return dndProcessDropNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
154
    case TDMT_DND_CREATE_SNODE:
S
shm  
Shengliang Guan 已提交
155
      return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
156
    case TDMT_DND_DROP_SNODE:
S
shm  
Shengliang Guan 已提交
157
      return dndProcessDropNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
158
    case TDMT_DND_CREATE_BNODE:
S
shm  
Shengliang Guan 已提交
159
      return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
160
    case TDMT_DND_DROP_BNODE:
S
shm  
Shengliang Guan 已提交
161
      return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
162

S
shm  
Shengliang Guan 已提交
163 164 165 166 167
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      return -1;
  }
}