vmWorker.c 12.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
  dndSendRsp(pWrapper, &rsp);
}

S
Shengliang Guan 已提交
24 25 26
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
  dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);

  switch (msgType) {
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_ALTER_VNODE:
      code = vmProcessAlterVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_SYNC_VNODE:
      code = vmProcessSyncVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_COMPACT_VNODE:
      code = vmProcessCompactVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
54
    vmSendRsp(pMgmt->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
55 56 57 58 59 60 61
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
62 63 64
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
65 66 67 68
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
69 70 71
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
72
  }
S
shm  
Shengliang Guan 已提交
73
}
S
shm  
Shengliang Guan 已提交
74

S
Shengliang Guan 已提交
75 76 77
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
78 79 80 81
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
82 83 84
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
85
  }
S
Shengliang Guan 已提交
86 87
}

S
Shengliang Guan 已提交
88 89 90
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
91
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
92 93 94 95
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
96 97

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
98
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
99 100 101 102 103 104 105
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
      vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
    }
S
shm  
Shengliang Guan 已提交
106 107 108 109
  }

  vnodeProcessWMsgs(pVnode->pImpl, pArray);

S
Shengliang Guan 已提交
110 111
  numOfMsgs = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
112 113
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
114 115 116
    SRpcMsg  *pRsp = NULL;

    int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
S
shm  
Shengliang Guan 已提交
117
    if (pRsp != NULL) {
S
shm  
Shengliang Guan 已提交
118
      pRsp->ahandle = pRpc->ahandle;
S
shm  
Shengliang Guan 已提交
119
      dndSendRsp(pVnode->pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
120 121
      free(pRsp);
    } else {
S
Shengliang Guan 已提交
122
      if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
123
      vmSendRsp(pVnode->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
124 125 126
    }
  }

S
Shengliang Guan 已提交
127
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
128 129 130 131
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
132 133 134 135 136
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
137 138 139
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SNodeMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
140 141 142 143 144 145

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
146
    (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
147 148 149
  }
}

S
Shengliang Guan 已提交
150 151 152
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SNodeMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
153 154 155 156 157 158

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
159
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
160 161 162
  }
}

S
Shengliang Guan 已提交
163
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
164 165
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
166

S
Shengliang Guan 已提交
167
  SMsgHead *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
168 169 170 171 172
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
173
    dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
174
    return -1;
S
shm  
Shengliang Guan 已提交
175 176
  }

S
Shengliang Guan 已提交
177
  switch (qtype) {
S
Shengliang Guan 已提交
178
    case QUERY_QUEUE:
S
Shengliang Guan 已提交
179 180 181
      dTrace("msg:%p, will be written into vnode-query queue", pMsg);
      code = taosWriteQitem(pVnode->pQueryQ, pMsg);
      break;
S
Shengliang Guan 已提交
182
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
183 184 185
      dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
      code = taosWriteQitem(pVnode->pFetchQ, pMsg);
      break;
S
Shengliang Guan 已提交
186
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
187 188
      dTrace("msg:%p, will be written into vnode-write queue", pMsg);
      code = taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
189 190
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
191 192
      dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
      code = taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
193
      break;
S
Shengliang Guan 已提交
194 195 196 197
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
198 199 200

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
201 202
}

S
Shengliang Guan 已提交
203
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
S
shm  
Shengliang Guan 已提交
204

S
Shengliang Guan 已提交
205
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
S
shm  
Shengliang Guan 已提交
206

S
Shengliang Guan 已提交
207
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
S
shm  
Shengliang Guan 已提交
208

S
Shengliang Guan 已提交
209
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
S
shm  
Shengliang Guan 已提交
210

S
Shengliang Guan 已提交
211
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
212
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
213
  dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
214
  return taosWriteQitem(pWorker->queue, pMsg);
S
shm  
Shengliang Guan 已提交
215 216
}

S
Shengliang Guan 已提交
217
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
S
shm  
Shengliang Guan 已提交
218
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
219 220
  int32_t      code = -1;
  SMsgHead    *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
221 222 223 224

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
225 226
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg != NULL) {
S
Shengliang Guan 已提交
227
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
228
    pMsg->rpcMsg = *pRpc;
S
Shengliang Guan 已提交
229
    switch (qtype) {
S
Shengliang Guan 已提交
230
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
231 232 233
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
        code = taosWriteQitem(pVnode->pQueryQ, pMsg);
        break;
S
Shengliang Guan 已提交
234
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
235 236 237
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
        code = taosWriteQitem(pVnode->pFetchQ, pMsg);
        break;
S
Shengliang Guan 已提交
238
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
239 240 241 242 243 244 245
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
        code = taosWriteQitem(pVnode->pApplyQ, pMsg);
        break;
      default:
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
246
  }
S
shm  
Shengliang Guan 已提交
247 248 249 250
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang Guan 已提交
251
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
252
  return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
253
}
S
shm  
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
256
  return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
257
}
S
shm  
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
260
  return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
S
shm  
Shengliang Guan 已提交
261 262
}

S
Shengliang Guan 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
  int32_t    size = -1;
  SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
  if (pVnode != NULL) {
    switch (qtype) {
      case QUERY_QUEUE:
        size = taosQueueSize(pVnode->pQueryQ);
        break;
      case FETCH_QUEUE:
        size = taosQueueSize(pVnode->pFetchQ);
        break;
      case WRITE_QUEUE:
        size = taosQueueSize(pVnode->pWriteQ);
        break;
      case SYNC_QUEUE:
        size = taosQueueSize(pVnode->pSyncQ);
        break;
      case APPLY_QUEUE:
        size = taosQueueSize(pVnode->pApplyQ);
        break;
      default:
        break;
    }
  }
  vmReleaseVnode(pWrapper->pMgmt, pVnode);
  return size;
}

S
shm  
Shengliang Guan 已提交
291 292 293 294
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
295
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
296
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
shm  
Shengliang Guan 已提交
297 298 299 300 301 302 303

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
304
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
305 306 307
  return 0;
}

S
shm  
Shengliang Guan 已提交
308
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
309
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
310
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
311 312 313 314 315 316 317 318
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
  pVnode->pSyncQ = NULL;
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
319
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
320 321
}

S
shm  
Shengliang Guan 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
  int32_t maxFetchThreads = 4;
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);

  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
336
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
337 338 339
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
S
Shengliang Guan 已提交
340
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
341 342 343 344 345 346 347 348 349 350 351

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
352
  SSingleWorkerCfg cfg = {
S
Shengliang Guan 已提交
353
      .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
S
Shengliang Guan 已提交
354
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
355
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
356 357 358
    return -1;
  }

S
Shengliang Guan 已提交
359
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
360 361 362 363
  return 0;
}

void vmStopWorker(SVnodesMgmt *pMgmt) {
S
Shengliang Guan 已提交
364
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
365
  tQWorkerCleanup(&pMgmt->fetchPool);
S
shm  
Shengliang Guan 已提交
366 367 368
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
369
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
370
}