dndMsg.c 5.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20 21 22
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
S
Shengliang Guan 已提交
23
    dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  }
}

static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
  }

  return msgFp;
}

static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
37 38 39 40 41 42 43 44
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
shm  
Shengliang Guan 已提交
45 46
  pMsg->clientIp = connInfo.clientIp;
  pMsg->clientPort = connInfo.clientPort;
S
shm  
Shengliang Guan 已提交
47
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
48 49 50 51 52 53
  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
54
  NodeMsgFp msgFp = NULL;
S
shm  
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }

S
shm  
Shengliang Guan 已提交
60 61 62 63
  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
shm  
Shengliang Guan 已提交
64 65

  if (pWrapper->procType == PROC_SINGLE) {
S
Shengliang Guan 已提交
66
    dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
S
Shengliang Guan 已提交
67
    code = (*msgFp)(pWrapper, pMsg);
S
shm  
Shengliang Guan 已提交
68
  } else if (pWrapper->procType == PROC_PARENT) {
S
shm  
Shengliang Guan 已提交
69
    dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
S
Shengliang Guan 已提交
70
           pRpc->ahandle, pMsg->user);
S
Shengliang Guan 已提交
71
    code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
S
shm  
Shengliang Guan 已提交
72
  } else {
S
Shengliang Guan 已提交
73 74
    dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
           pMsg->user);
S
Shengliang Guan 已提交
75
    ASSERT(1);
S
shm  
Shengliang Guan 已提交
76 77 78 79 80
  }

_OVER:
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
S
Shengliang Guan 已提交
81
      dTrace("msg:%p, is freed in parent process", pMsg);
S
shm  
Shengliang Guan 已提交
82 83 84 85
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
shm  
Shengliang Guan 已提交
86
    dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
S
shm  
Shengliang Guan 已提交
87
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
88 89 90 91 92 93 94 95
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

S
shm  
Shengliang Guan 已提交
96
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
97 98
}

S
shm  
Shengliang Guan 已提交
99 100 101 102 103
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dndReleaseWrapper(pWrapper);
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
S
shm  
Shengliang Guan 已提交
104
    dError("failed to create node since %s", terrstr());
S
shm  
Shengliang Guan 已提交
105 106 107 108
    return -1;
  }

  pWrapper = &pDnode->wrappers[ntype];
S
shm  
Shengliang Guan 已提交
109 110 111 112 113 114 115

  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
  int32_t code = (*pWrapper->fp.createMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
  } else {
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
  }

  return code;
}

static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
S
shm  
Shengliang Guan 已提交
131
    dError("failed to drop node since %s", terrstr());
S
shm  
Shengliang Guan 已提交
132 133 134
    return -1;
  }

S
shm  
Shengliang Guan 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147
  taosWLockLatch(&pWrapper->latch);
  pWrapper->deployed = false;

  int32_t code = (*pWrapper->fp.dropMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    pWrapper->deployed = true;
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
    pWrapper->deployed = false;
    dDebug("node:%s, has been dropped", pWrapper->name);
  }

  taosWUnLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
148
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
149
  return code;
S
shm  
Shengliang Guan 已提交
150 151
}

S
shm  
Shengliang Guan 已提交
152
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
153
  switch (pMsg->rpcMsg.msgType) {
S
shm  
Shengliang Guan 已提交
154
    case TDMT_DND_CREATE_MNODE:
S
shm  
Shengliang Guan 已提交
155
      return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
156
    case TDMT_DND_DROP_MNODE:
S
shm  
Shengliang Guan 已提交
157
      return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
158
    case TDMT_DND_CREATE_QNODE:
S
shm  
Shengliang Guan 已提交
159
      return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
160
    case TDMT_DND_DROP_QNODE:
S
shm  
Shengliang Guan 已提交
161
      return dndProcessDropNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
162
    case TDMT_DND_CREATE_SNODE:
S
shm  
Shengliang Guan 已提交
163
      return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
164
    case TDMT_DND_DROP_SNODE:
S
shm  
Shengliang Guan 已提交
165
      return dndProcessDropNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
166
    case TDMT_DND_CREATE_BNODE:
S
shm  
Shengliang Guan 已提交
167
      return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
168
    case TDMT_DND_DROP_BNODE:
S
shm  
Shengliang Guan 已提交
169
      return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
170 171 172 173 174
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      return -1;
  }
}