vmHandle.c 13.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
S
Shengliang Guan 已提交
20 21 22
  pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
  if (pInfo->pVloads == NULL) return;

23
  taosThreadRwlockRdlock(&pMgmt->lock);
S
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36

  void *pIter = taosHashIterate(pMgmt->hash, NULL);
  while (pIter) {
    SVnodeObj **ppVnode = pIter;
    if (ppVnode == NULL || *ppVnode == NULL) continue;

    SVnodeObj *pVnode = *ppVnode;
    SVnodeLoad vload = {0};
    vnodeGetLoad(pVnode->pImpl, &vload);
    taosArrayPush(pInfo->pVloads, &vload);
    pIter = taosHashIterate(pMgmt->hash, pIter);
  }

37
  taosThreadRwlockUnlock(&pMgmt->lock);
S
Shengliang Guan 已提交
38 39
}

S
Shengliang Guan 已提交
40
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
S
Shengliang Guan 已提交
41
  SMonVloadInfo vloads = {0};
S
Shengliang 已提交
42
  vmGetVnodeLoads(pMgmt, &vloads);
S
Shengliang Guan 已提交
43 44 45

  SArray *pVloads = vloads.pVloads;
  if (pVloads == NULL) return;
S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54

  int32_t totalVnodes = 0;
  int32_t masterNum = 0;
  int64_t numOfSelectReqs = 0;
  int64_t numOfInsertReqs = 0;
  int64_t numOfInsertSuccessReqs = 0;
  int64_t numOfBatchInsertReqs = 0;
  int64_t numOfBatchInsertSuccessReqs = 0;

S
Shengliang Guan 已提交
55 56
  for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) {
    SVnodeLoad *pLoad = taosArrayGet(pVloads, i);
S
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    numOfSelectReqs += pLoad->numOfSelectReqs;
    numOfInsertReqs += pLoad->numOfInsertReqs;
    numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs;
    numOfBatchInsertReqs += pLoad->numOfBatchInsertReqs;
    numOfBatchInsertSuccessReqs += pLoad->numOfBatchInsertSuccessReqs;
    if (pLoad->syncState == TAOS_SYNC_STATE_LEADER) masterNum++;
    totalVnodes++;
  }

  pInfo->vstat.totalVnodes = totalVnodes;
  pInfo->vstat.masterNum = masterNum;
  pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs;
  pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs;
  pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs;
  pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs;
  pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs;
S
Shengliang Guan 已提交
73 74 75 76 77 78 79
  pMgmt->state.totalVnodes = totalVnodes;
  pMgmt->state.masterNum = masterNum;
  pMgmt->state.numOfSelectReqs = numOfSelectReqs;
  pMgmt->state.numOfInsertReqs = numOfInsertReqs;
  pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
  pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs;
  pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
S
Shengliang Guan 已提交
80

81
  tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
S
Shengliang Guan 已提交
82
  taosArrayDestroy(pVloads);
83 84
}

S
Shengliang Guan 已提交
85
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
86
  SMonVmInfo vmInfo = {0};
S
Shengliang 已提交
87
  vmGetMonitorInfo(pMgmt, &vmInfo);
S
Shengliang Guan 已提交
88
  dmGetMonitorSystemInfo(&vmInfo.sys);
89
  monGetLogs(&vmInfo.log);
90 91 92 93 94 95 96 97 98 99 100 101 102 103

  int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
  if (rspLen < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
S
Shengliang Guan 已提交
104 105
  pMsg->info.rsp = pRsp;
  pMsg->info.rspLen = rspLen;
106 107 108 109
  tFreeSMonVmInfo(&vmInfo);
  return 0;
}

S
Shengliang Guan 已提交
110
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
111
  SMonVloadInfo vloads = {0};
S
Shengliang 已提交
112
  vmGetVnodeLoads(pMgmt, &vloads);
113 114 115 116 117 118 119 120 121 122 123 124 125 126

  int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
  if (rspLen < 0) {
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  void *pRsp = rpcMallocCont(rspLen);
  if (pRsp == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
S
Shengliang Guan 已提交
127 128
  pMsg->info.rsp = pRsp;
  pMsg->info.rspLen = rspLen;
129 130 131 132
  tFreeSMonVloadInfo(&vloads);
  return 0;
}

S
shm  
Shengliang Guan 已提交
133
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
H
Hongze Cheng 已提交
134 135
  memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg));

S
shm  
Shengliang Guan 已提交
136
  pCfg->vgId = pCreate->vgId;
S
Shengliang Guan 已提交
137 138
  tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname));
  pCfg->dbId = pCreate->dbUid;
H
Hongze Cheng 已提交
139 140
  pCfg->szPage = pCreate->pageSize * 1024;
  pCfg->szCache = pCreate->pages;
H
Hongze Cheng 已提交
141
  pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
S
shm  
Shengliang Guan 已提交
142
  pCfg->isWeak = true;
C
Cary Xu 已提交
143
  pCfg->isTsma = pCreate->isTsma;
C
Cary Xu 已提交
144
  pCfg->tsdbCfg.compression = pCreate->compression;
H
Hongze Cheng 已提交
145
  pCfg->tsdbCfg.precision = pCreate->precision;
146 147 148 149
  pCfg->tsdbCfg.days = pCreate->daysPerFile;
  pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
  pCfg->tsdbCfg.keep1 = pCreate->daysToKeep1;
  pCfg->tsdbCfg.keep2 = pCreate->daysToKeep2;
H
Hongze Cheng 已提交
150 151
  pCfg->tsdbCfg.minRows = pCreate->minRows;
  pCfg->tsdbCfg.maxRows = pCreate->maxRows;
C
Cary Xu 已提交
152 153 154
  for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
    memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
  }
S
shm  
Shengliang Guan 已提交
155 156 157 158
  pCfg->walCfg.vgId = pCreate->vgId;
  pCfg->hashBegin = pCreate->hashBegin;
  pCfg->hashEnd = pCreate->hashEnd;
  pCfg->hashMethod = pCreate->hashMethod;
M
Minghao Li 已提交
159 160 161

  pCfg->syncCfg.myIndex = pCreate->selfIndex;
  pCfg->syncCfg.replicaNum = pCreate->replica;
S
Shengliang Guan 已提交
162
  memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
M
Minghao Li 已提交
163
  for (int i = 0; i < pCreate->replica; ++i) {
S
Shengliang Guan 已提交
164 165 166
    pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port;
    snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s",
             pCreate->replicas[i].fqdn);
M
Minghao Li 已提交
167
  }
S
shm  
Shengliang Guan 已提交
168
}
S
shm  
Shengliang Guan 已提交
169

S
Shengliang 已提交
170
static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
S
shm  
Shengliang Guan 已提交
171 172
  pCfg->vgId = pCreate->vgId;
  pCfg->vgVersion = pCreate->vgVersion;
S
Shengliang Guan 已提交
173 174
  pCfg->dropped = 0;
  snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
S
shm  
Shengliang Guan 已提交
175 176
}

S
Shengliang Guan 已提交
177
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
178
  SCreateVnodeReq createReq = {0};
S
Shengliang Guan 已提交
179 180
  int32_t         code = -1;
  char            path[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
181

S
Shengliang Guan 已提交
182
  if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
S
shm  
Shengliang Guan 已提交
183 184 185 186
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

S
Shengliang Guan 已提交
187
  dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma);
S
shm  
Shengliang Guan 已提交
188 189 190 191 192 193 194 195 196 197

  SVnodeCfg vnodeCfg = {0};
  vmGenerateVnodeCfg(&createReq, &vnodeCfg);

  SWrapperCfg wrapperCfg = {0};
  vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
  if (pVnode != NULL) {
    dDebug("vgId:%d, already exist", createReq.vgId);
S
Shengliang Guan 已提交
198
    tFreeSCreateVnodeReq(&createReq);
S
shm  
Shengliang Guan 已提交
199
    vmReleaseVnode(pMgmt, pVnode);
200
    terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
S
shm  
Shengliang Guan 已提交
201 202 203
    return -1;
  }

H
Hongze Cheng 已提交
204 205 206 207 208 209 210
  snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
  if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
    tFreeSCreateVnodeReq(&createReq);
    dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
    return -1;
  }

S
Shengliang 已提交
211
  SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
S
shm  
Shengliang Guan 已提交
212
  if (pImpl == NULL) {
C
Cary Xu 已提交
213
    dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
214
    code = terrno;
S
Shengliang Guan 已提交
215
    goto _OVER;
S
shm  
Shengliang Guan 已提交
216 217
  }

S
Shengliang Guan 已提交
218
  code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
S
shm  
Shengliang Guan 已提交
219 220
  if (code != 0) {
    dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
S
Shengliang Guan 已提交
221
    goto _OVER;
S
shm  
Shengliang Guan 已提交
222 223
  }

L
Li Minghao 已提交
224 225 226
  code = vnodeStart(pImpl);
  if (code != 0) {
    dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
S
Shengliang Guan 已提交
227
    goto _OVER;
L
Li Minghao 已提交
228 229
  }

S
Shengliang Guan 已提交
230 231 232 233
  code = vmWriteVnodeListToFile(pMgmt);
  if (code != 0) goto _OVER;

_OVER:
S
shm  
Shengliang Guan 已提交
234 235
  if (code != 0) {
    vnodeClose(pImpl);
H
refact  
Hongze Cheng 已提交
236
    vnodeDestroy(path, pMgmt->pTfs);
S
shm  
Shengliang Guan 已提交
237 238
  }

S
Shengliang Guan 已提交
239 240 241
  tFreeSCreateVnodeReq(&createReq);
  terrno = code;
  return code;
S
shm  
Shengliang Guan 已提交
242 243
}

S
Shengliang Guan 已提交
244
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
shm  
Shengliang Guan 已提交
245
  SDropVnodeReq dropReq = {0};
S
Shengliang Guan 已提交
246
  if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
S
shm  
Shengliang Guan 已提交
247 248 249 250 251 252 253 254 255 256
    terrno = TSDB_CODE_INVALID_MSG;
    return -1;
  }

  int32_t vgId = dropReq.vgId;
  dDebug("vgId:%d, drop vnode req is received", vgId);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
  if (pVnode == NULL) {
    dDebug("vgId:%d, failed to drop since %s", vgId, terrstr());
257
    terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
S
shm  
Shengliang Guan 已提交
258 259 260 261
    return -1;
  }

  pVnode->dropped = 1;
S
Shengliang Guan 已提交
262
  if (vmWriteVnodeListToFile(pMgmt) != 0) {
S
shm  
Shengliang Guan 已提交
263 264 265 266 267 268
    pVnode->dropped = 0;
    vmReleaseVnode(pMgmt, pVnode);
    return -1;
  }

  vmCloseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
269
  vmWriteVnodeListToFile(pMgmt);
S
shm  
Shengliang Guan 已提交
270 271 272 273

  return 0;
}

S
Shengliang Guan 已提交
274
SArray *vmGetMsgHandles() {
S
Shengliang 已提交
275
  int32_t code = -1;
S
Shengliang Guan 已提交
276
  SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
S
Shengliang 已提交
277 278 279 280
  if (pArray == NULL) goto _OVER;

  if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_MON_VM_LOAD, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
281

S
shm  
Shengliang Guan 已提交
282
  // Requests handled by VNODE
S
Shengliang 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutNodeMsgToQueryQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  // if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutNodeMsgToWriteQueue, 0)== NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
L
Liu Jicong 已提交
308
  if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
S
Shengliang 已提交
309 310 311 312
  if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
L
Liu Jicong 已提交
313 314 315 316
  if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;

S
Shengliang 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
  if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;

  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
  if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;

  code = 0;

_OVER:
  if (code != 0) {
    taosArrayDestroy(pArray);
    return NULL;
  } else {
    return pArray;
  }
S
shm  
Shengliang Guan 已提交
341
}