vmWorker.c 11.3 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20
  if (pMsg->info.handle == NULL) return;
S
Shengliang Guan 已提交
21 22
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
23 24
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
S
Shengliang Guan 已提交
25
      .info = pMsg->info,
S
Shengliang Guan 已提交
26
  };
S
Shengliang Guan 已提交
27
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
28 29
}

S
Shengliang Guan 已提交
30
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang 已提交
31
  SVnodeMgmt *pMgmt = pInfo->ahandle;
S
Shengliang Guan 已提交
32
  int32_t     code = -1;
S
Shengliang Guan 已提交
33

dengyihao's avatar
dengyihao 已提交
34 35
  STraceId *trace = &pMsg->info.traceId;
  dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
36
  switch (pMsg->msgType) {
37
    case TDMT_MON_VM_INFO:
S
Shengliang 已提交
38
      code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
39 40
      break;
    case TDMT_MON_VM_LOAD:
S
Shengliang 已提交
41
      code = vmProcessGetLoadsReq(pMgmt, pMsg);
42
      break;
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
51
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
52 53
  }

S
Shengliang Guan 已提交
54
  if (IsReq(pMsg)) {
55 56
    if (code != 0) {
      if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
57
      dError("msg:%p, failed to process since %s", pMsg, terrstr());
S
Shengliang Guan 已提交
58
    }
S
Shengliang Guan 已提交
59
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
60 61
  }

S
Shengliang Guan 已提交
62
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
63
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
64 65 66
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
67
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
68 69
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
70
  dTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
71
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
S
Shengliang Guan 已提交
72
  if (code != 0) {
S
Shengliang Guan 已提交
73
    if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
74
    dError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
75
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
76
  }
S
Shengliang Guan 已提交
77 78

  dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
79 80
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
81
}
S
shm  
Shengliang Guan 已提交
82

S
Shengliang Guan 已提交
83
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
84 85
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
86
  dTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
87
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
S
Shengliang Guan 已提交
88
  if (code != 0) {
S
Shengliang Guan 已提交
89
    if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
90
    dError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
91
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
92
  }
S
Shengliang Guan 已提交
93 94

  dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
95 96
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
97 98
}

S
Shengliang Guan 已提交
99 100
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
dengyihao's avatar
dengyihao 已提交
101
  SRpcMsg *  pMsg = NULL;
S
shm  
Shengliang Guan 已提交
102 103

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
104 105
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    dTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
S
shm  
Shengliang Guan 已提交
106

S
Shengliang Guan 已提交
107 108
    int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
    if (code != 0) {
S
Shengliang Guan 已提交
109
      if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
110
      dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
111
      vmSendRsp(pMsg, code);
112
    }
M
Minghao Li 已提交
113

S
Shengliang Guan 已提交
114
    dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
S
Shengliang Guan 已提交
115
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
116
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
117 118 119
  }
}

L
Liu Jicong 已提交
120 121
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
dengyihao's avatar
dengyihao 已提交
122
  SRpcMsg *  pMsg = NULL;
L
Liu Jicong 已提交
123 124

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
125 126
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    dTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg);
L
Liu Jicong 已提交
127

S
Shengliang Guan 已提交
128
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
L
Liu Jicong 已提交
129
    if (code != 0) {
S
Shengliang Guan 已提交
130
      if (terrno != 0) code = terrno;
S
Shengliang Guan 已提交
131
      dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
132
      vmSendRsp(pMsg, code);
L
Liu Jicong 已提交
133
    }
S
Shengliang Guan 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136 137
    dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
138 139 140
  }
}

S
Shengliang Guan 已提交
141
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
142
  SMsgHead *pHead = pMsg->pCont;
S
Shengliang Guan 已提交
143 144
  int32_t   code = 0;

D
dapan1121 已提交
145 146
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
147 148 149

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
150
    dError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(),
S
Shengliang Guan 已提交
151
           TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
152
    return terrno != 0 ? terrno : -1;
S
shm  
Shengliang Guan 已提交
153 154
  }

S
Shengliang Guan 已提交
155
  switch (qtype) {
S
Shengliang Guan 已提交
156
    case QUERY_QUEUE:
D
dapan1121 已提交
157
      vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
S
Shengliang Guan 已提交
158
      dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
159
      taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
160
      break;
S
Shengliang Guan 已提交
161
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
162
      dTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
163
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
164
      break;
S
Shengliang Guan 已提交
165
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
166
      dTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
167
      taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
168 169
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
170
      dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
171
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
172
      break;
S
Shengliang Guan 已提交
173
    case APPLY_QUEUE:
S
Shengliang Guan 已提交
174
      dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
175 176
      taosWriteQitem(pVnode->pApplyQ, pMsg);
      break;
S
Shengliang Guan 已提交
177
    default:
S
Shengliang Guan 已提交
178
      code = -1;
S
Shengliang Guan 已提交
179 180 181
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
182 183 184

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
185 186
}

S
Shengliang Guan 已提交
187
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
S
shm  
Shengliang Guan 已提交
188

S
Shengliang Guan 已提交
189
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
S
shm  
Shengliang Guan 已提交
190

S
Shengliang Guan 已提交
191
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
S
shm  
Shengliang Guan 已提交
192

S
Shengliang Guan 已提交
193
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
S
shm  
Shengliang Guan 已提交
194

S
Shengliang Guan 已提交
195
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
196
  dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
197
  taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
S
Shengliang Guan 已提交
198
  return 0;
S
shm  
Shengliang Guan 已提交
199 200
}

S
Shengliang Guan 已提交
201
int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
202
  dTrace("msg:%p, put into vnode-monitor queue", pMsg);
S
Shengliang Guan 已提交
203
  taosWriteQitem(pMgmt->monitorWorker.queue, pMsg);
204 205 206
  return 0;
}

S
Shengliang Guan 已提交
207
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
208
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
209
  if (pMsg == NULL) return -1;
S
Shengliang Guan 已提交
210

S
Shengliang Guan 已提交
211
  SMsgHead *pHead = pRpc->pCont;
212
  dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
213

S
Shengliang Guan 已提交
214 215 216 217
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
  return vmPutMsgToQueue(pMgmt, pMsg, qtype);
S
shm  
Shengliang Guan 已提交
218 219
}

S
Shengliang 已提交
220
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
S
Shengliang Guan 已提交
221
  int32_t    size = -1;
S
Shengliang 已提交
222
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
S
Shengliang Guan 已提交
223 224 225
  if (pVnode != NULL) {
    switch (qtype) {
      case WRITE_QUEUE:
226
        size = taosQueueItemSize(pVnode->pWriteQ);
S
Shengliang Guan 已提交
227 228
        break;
      case SYNC_QUEUE:
229
        size = taosQueueItemSize(pVnode->pSyncQ);
S
Shengliang Guan 已提交
230 231
        break;
      case APPLY_QUEUE:
232
        size = taosQueueItemSize(pVnode->pApplyQ);
S
Shengliang Guan 已提交
233
        break;
S
Shengliang Guan 已提交
234
      case QUERY_QUEUE:
235
        size = taosQueueItemSize(pVnode->pQueryQ);
S
Shengliang Guan 已提交
236 237
        break;
      case FETCH_QUEUE:
238
        size = taosQueueItemSize(pVnode->pFetchQ);
S
Shengliang Guan 已提交
239
        break;
S
Shengliang Guan 已提交
240 241 242 243
      default:
        break;
    }
  }
S
Shengliang 已提交
244
  vmReleaseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
245 246 247
  return size;
}

S
Shengliang 已提交
248
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
249
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
S
shm  
Shengliang Guan 已提交
250
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
251
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
S
shm  
Shengliang Guan 已提交
252
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
Shengliang Guan 已提交
253 254
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
S
shm  
Shengliang Guan 已提交
255

S
Shengliang Guan 已提交
256 257
  if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
      pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) {
S
shm  
Shengliang Guan 已提交
258 259 260 261
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
262
  dDebug("vgId:%d, queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
263 264 265
  return 0;
}

S
Shengliang 已提交
266
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
267
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
S
Shengliang Guan 已提交
268
  tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
269
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
Shengliang Guan 已提交
270 271
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
L
Liu Jicong 已提交
272
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
S
shm  
Shengliang Guan 已提交
273 274
  pVnode->pWriteQ = NULL;
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
275
  pVnode->pApplyQ = NULL;
S
shm  
Shengliang Guan 已提交
276
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
277
  pVnode->pFetchQ = NULL;
L
Liu Jicong 已提交
278
  pVnode->pMergeQ = NULL;
S
Shengliang Guan 已提交
279
  dDebug("vgId:%d, queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
280 281
}

S
Shengliang 已提交
282
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
283 284
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
285 286
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
287 288
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
289
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
290
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
291 292
  pFPool->min = tsNumOfVnodeFetchThreads;
  pFPool->max = tsNumOfVnodeFetchThreads;
S
Shengliang Guan 已提交
293
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
294 295 296

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
S
Shengliang Guan 已提交
297
  pWPool->max = tsNumOfVnodeWriteThreads;
S
shm  
Shengliang Guan 已提交
298 299
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
300 301 302 303 304
  SWWorkerPool *pAPool = &pMgmt->applyPool;
  pAPool->name = "vnode-apply";
  pAPool->max = tsNumOfVnodeWriteThreads;
  if (tWWorkerInit(pAPool) != 0) return -1;

S
Shengliang Guan 已提交
305 306 307 308 309 310 311 312 313 314
  SWWorkerPool *pSPool = &pMgmt->syncPool;
  pSPool->name = "vnode-sync";
  pSPool->max = tsNumOfVnodeSyncThreads;
  if (tWWorkerInit(pSPool) != 0) return -1;

  SWWorkerPool *pMPool = &pMgmt->mergePool;
  pMPool->name = "vnode-merge";
  pMPool->max = tsNumOfVnodeMergeThreads;
  if (tWWorkerInit(pMPool) != 0) return -1;

S
Shengliang Guan 已提交
315
  SSingleWorkerCfg mgmtCfg = {
S
Shengliang Guan 已提交
316 317 318
      .min = 1,
      .max = 1,
      .name = "vnode-mgmt",
S
Shengliang Guan 已提交
319
      .fp = (FItem)vmProcessMgmtQueue,
S
Shengliang Guan 已提交
320 321
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
322
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
S
shm  
Shengliang Guan 已提交
323

S
Shengliang Guan 已提交
324
  SSingleWorkerCfg monitorCfg = {
S
Shengliang Guan 已提交
325 326 327
      .min = 1,
      .max = 1,
      .name = "vnode-monitor",
S
Shengliang Guan 已提交
328
      .fp = (FItem)vmProcessMgmtQueue,
S
Shengliang Guan 已提交
329 330
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
331
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &monitorCfg) != 0) return -1;
332

S
Shengliang Guan 已提交
333
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
334 335 336
  return 0;
}

S
Shengliang 已提交
337
void vmStopWorker(SVnodeMgmt *pMgmt) {
338
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
S
Shengliang Guan 已提交
339
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
shm  
Shengliang Guan 已提交
340
  tWWorkerCleanup(&pMgmt->writePool);
S
Shengliang Guan 已提交
341
  tWWorkerCleanup(&pMgmt->applyPool);
S
shm  
Shengliang Guan 已提交
342
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
343 344
  tQWorkerCleanup(&pMgmt->queryPool);
  tQWorkerCleanup(&pMgmt->fetchPool);
L
fix  
Liu Jicong 已提交
345
  tWWorkerCleanup(&pMgmt->mergePool);
S
Shengliang Guan 已提交
346
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
347
}