dndMsg.c 5.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "dndInt.h"

S
shm  
Shengliang Guan 已提交
19 20 21 22
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
  if (pWrapper != NULL) {
    dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
S
Shengliang Guan 已提交
23
    dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
  }
}

static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
  if (msgFp == NULL) {
    terrno = TSDB_CODE_MSG_NOT_PROCESSED;
  }

  return msgFp;
}

static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
S
shm  
Shengliang Guan 已提交
37 38 39 40 41 42 43 44
  SRpcConnInfo connInfo = {0};
  if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
    terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
    dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
    return -1;
  }

  memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
S
shm  
Shengliang Guan 已提交
45
  memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
46 47 48 49 50 51
  return 0;
}

void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
  int32_t   code = -1;
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
52
  NodeMsgFp msgFp = NULL;
S
shm  
Shengliang Guan 已提交
53

S
Shengliang Guan 已提交
54 55 56 57
  if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
    dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
  }

S
shm  
Shengliang Guan 已提交
58 59 60 61
  if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
  if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
  if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
  if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
S
shm  
Shengliang Guan 已提交
62 63

  if (pWrapper->procType == PROC_SINGLE) {
S
Shengliang Guan 已提交
64
    dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
65 66
    code = (*msgFp)(pWrapper->pMgmt, pMsg);
  } else if (pWrapper->procType == PROC_PARENT) {
S
Shengliang Guan 已提交
67 68
    dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
           pRpc->ahandle, pMsg->user);
S
shm  
Shengliang Guan 已提交
69 70
    code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
  } else {
S
Shengliang Guan 已提交
71 72
    dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
           pMsg->user);
S
Shengliang Guan 已提交
73
    ASSERT(1);
S
shm  
Shengliang Guan 已提交
74 75 76 77 78
  }

_OVER:
  if (code == 0) {
    if (pWrapper->procType == PROC_PARENT) {
S
Shengliang Guan 已提交
79
      dTrace("msg:%p, is freed in parent process", pMsg);
S
shm  
Shengliang Guan 已提交
80 81 82 83
      taosFreeQitem(pMsg);
      rpcFreeCont(pRpc->pCont);
    }
  } else {
S
shm  
Shengliang Guan 已提交
84
    dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
S
shm  
Shengliang Guan 已提交
85
    if (pRpc->msgType & 1U) {
S
shm  
Shengliang Guan 已提交
86 87 88 89 90 91 92 93
      SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
      dndSendRsp(pWrapper, &rsp);
    }
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

S
shm  
Shengliang Guan 已提交
94
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
95 96
}

S
shm  
Shengliang Guan 已提交
97 98 99 100 101
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper != NULL) {
    dndReleaseWrapper(pWrapper);
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
S
shm  
Shengliang Guan 已提交
102
    dError("failed to create node since %s", terrstr());
S
shm  
Shengliang Guan 已提交
103 104 105 106
    return -1;
  }

  pWrapper = &pDnode->wrappers[ntype];
S
shm  
Shengliang Guan 已提交
107 108 109 110 111 112 113

  if (taosMkDir(pWrapper->path) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
    return -1;
  }

S
shm  
Shengliang Guan 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  int32_t code = (*pWrapper->fp.createMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
  } else {
    dDebug("node:%s, has been opened", pWrapper->name);
    pWrapper->deployed = true;
  }

  return code;
}

static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
  SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
S
shm  
Shengliang Guan 已提交
129
    dError("failed to drop node since %s", terrstr());
S
shm  
Shengliang Guan 已提交
130 131 132
    return -1;
  }

S
shm  
Shengliang Guan 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145
  taosWLockLatch(&pWrapper->latch);
  pWrapper->deployed = false;

  int32_t code = (*pWrapper->fp.dropMsgFp)(pWrapper, pMsg);
  if (code != 0) {
    pWrapper->deployed = true;
    dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
  } else {
    pWrapper->deployed = false;
    dDebug("node:%s, has been dropped", pWrapper->name);
  }

  taosWUnLockLatch(&pWrapper->latch);
S
shm  
Shengliang Guan 已提交
146
  dndReleaseWrapper(pWrapper);
S
shm  
Shengliang Guan 已提交
147
  return code;
S
shm  
Shengliang Guan 已提交
148 149
}

S
shm  
Shengliang Guan 已提交
150
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
151
  switch (pMsg->rpcMsg.msgType) {
S
shm  
Shengliang Guan 已提交
152
    case TDMT_DND_CREATE_MNODE:
S
shm  
Shengliang Guan 已提交
153
      return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
154
    case TDMT_DND_DROP_MNODE:
S
shm  
Shengliang Guan 已提交
155
      return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
S
shm  
Shengliang Guan 已提交
156
    case TDMT_DND_CREATE_QNODE:
S
shm  
Shengliang Guan 已提交
157
      return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
158
    case TDMT_DND_DROP_QNODE:
S
shm  
Shengliang Guan 已提交
159
      return dndProcessDropNodeMsg(pDnode, QNODE, pMsg);
S
shm  
Shengliang Guan 已提交
160
    case TDMT_DND_CREATE_SNODE:
S
shm  
Shengliang Guan 已提交
161
      return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
162
    case TDMT_DND_DROP_SNODE:
S
shm  
Shengliang Guan 已提交
163
      return dndProcessDropNodeMsg(pDnode, SNODE, pMsg);
S
shm  
Shengliang Guan 已提交
164
    case TDMT_DND_CREATE_BNODE:
S
shm  
Shengliang Guan 已提交
165
      return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
166
    case TDMT_DND_DROP_BNODE:
S
shm  
Shengliang Guan 已提交
167
      return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
168 169 170 171 172
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      return -1;
  }
}