vmWorker.c 14.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
  dndSendRsp(pWrapper, &rsp);
}

S
Shengliang Guan 已提交
24 25 26
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
  dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);

  switch (msgType) {
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_ALTER_VNODE:
      code = vmProcessAlterVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_SYNC_VNODE:
      code = vmProcessSyncVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_COMPACT_VNODE:
      code = vmProcessCompactVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
54
    vmSendRsp(pMgmt->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
55 56 57 58 59 60 61
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
62 63 64
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
65 66 67 68
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
69 70 71
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
72
  }
S
shm  
Shengliang Guan 已提交
73
}
S
shm  
Shengliang Guan 已提交
74

S
Shengliang Guan 已提交
75 76 77
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
78 79 80 81
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
82 83 84
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
85
  }
S
Shengliang Guan 已提交
86 87
}

S
Shengliang Guan 已提交
88 89 90
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
shm  
Shengliang Guan 已提交
91
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
92 93 94 95
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
96 97

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
98
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
99 100 101 102 103 104 105
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
      vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
    }
S
shm  
Shengliang Guan 已提交
106 107 108 109
  }

  vnodeProcessWMsgs(pVnode->pImpl, pArray);

S
Shengliang Guan 已提交
110 111
  numOfMsgs = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
112 113
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
114 115 116
    SRpcMsg  *pRsp = NULL;

    int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
S
shm  
Shengliang Guan 已提交
117
    if (pRsp != NULL) {
S
shm  
Shengliang Guan 已提交
118
      pRsp->ahandle = pRpc->ahandle;
S
shm  
Shengliang Guan 已提交
119
      dndSendRsp(pVnode->pWrapper, pRsp);
wafwerar's avatar
wafwerar 已提交
120
      taosMemoryFree(pRsp);
S
shm  
Shengliang Guan 已提交
121
    } else {
S
Shengliang Guan 已提交
122
      if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
123
      vmSendRsp(pVnode->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
124 125 126
    }
  }

S
Shengliang Guan 已提交
127
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
128 129 130 131
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
132 133 134 135 136
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
137 138 139
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SNodeMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
140 141 142 143 144 145

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
146
    (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
147 148 149
  }
}

S
Shengliang Guan 已提交
150 151 152
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SNodeMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
153 154 155 156 157 158

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
159
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
160 161 162
  }
}

L
Liu Jicong 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SNodeMsg  *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
    if (code != 0) {
      vmSendRsp(pVnode->pWrapper, pMsg, code);
      dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
      rpcFreeCont(pMsg->rpcMsg.pCont);
      taosFreeQitem(pMsg);
    }
  }
}

S
Shengliang Guan 已提交
181
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
182
  SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
183
  SMsgHead *pHead = pRpc->pCont;
D
dapan1121 已提交
184 185
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
186 187 188

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
189
    dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
190
    return -1;
S
shm  
Shengliang Guan 已提交
191 192
  }

S
Shengliang Guan 已提交
193
  int32_t code = 0;
S
Shengliang Guan 已提交
194
  switch (qtype) {
S
Shengliang Guan 已提交
195
    case QUERY_QUEUE:
S
Shengliang Guan 已提交
196
      dTrace("msg:%p, will be written into vnode-query queue", pMsg);
S
Shengliang Guan 已提交
197
      taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
198
      break;
S
Shengliang Guan 已提交
199
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
200
      dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
201
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
202
      break;
S
Shengliang Guan 已提交
203
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
204
      dTrace("msg:%p, will be written into vnode-write queue", pMsg);
S
Shengliang Guan 已提交
205
      taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
206 207
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
208
      dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
S
Shengliang Guan 已提交
209
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
210
      break;
L
fix  
Liu Jicong 已提交
211 212
    case MERGE_QUEUE:
      dTrace("msg:%p, will be written into vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
213
      taosWriteQitem(pVnode->pMergeQ, pMsg);
L
fix  
Liu Jicong 已提交
214
      break;
S
Shengliang Guan 已提交
215
    default:
S
Shengliang Guan 已提交
216
      code = -1;
S
Shengliang Guan 已提交
217 218 219
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
220 221 222

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
223 224
}

S
Shengliang Guan 已提交
225 226 227 228
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
S
shm  
Shengliang Guan 已提交
229

S
Shengliang Guan 已提交
230 231 232 233
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
S
shm  
Shengliang Guan 已提交
234

S
Shengliang Guan 已提交
235 236 237 238
int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
S
shm  
Shengliang Guan 已提交
239

S
Shengliang Guan 已提交
240 241 242 243
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245 246 247 248
int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
L
Liu Jicong 已提交
249

S
Shengliang Guan 已提交
250 251
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt   *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
252
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
253
  dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
254 255
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
256 257
}

S
Shengliang Guan 已提交
258
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
S
shm  
Shengliang Guan 已提交
259
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
260
  SMsgHead    *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
261 262 263 264

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
265
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
S
Shengliang Guan 已提交
266 267 268 269 270
  int32_t   code = 0;

  if (pMsg == NULL) {
    code = -1;
  } else {
S
Shengliang Guan 已提交
271
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
272
    pMsg->rpcMsg = *pRpc;
S
Shengliang Guan 已提交
273
    switch (qtype) {
S
Shengliang Guan 已提交
274
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
275
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
S
Shengliang Guan 已提交
276
        taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
277
        break;
S
Shengliang Guan 已提交
278
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
279
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
280
        taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
281
        break;
S
Shengliang Guan 已提交
282
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
283
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
S
Shengliang Guan 已提交
284
        taosWriteQitem(pVnode->pApplyQ, pMsg);
S
Shengliang Guan 已提交
285
        break;
L
Liu Jicong 已提交
286 287
      case MERGE_QUEUE:
        dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
288
        taosWriteQitem(pVnode->pMergeQ, pMsg);
L
Liu Jicong 已提交
289
        break;
S
Shengliang Guan 已提交
290
      default:
S
Shengliang Guan 已提交
291
        code = -1;
S
Shengliang Guan 已提交
292 293 294
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
295
  }
S
shm  
Shengliang Guan 已提交
296 297 298 299
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang Guan 已提交
300
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
301
  return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
302
}
S
shm  
Shengliang Guan 已提交
303

S
Shengliang Guan 已提交
304
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
305
  return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
306
}
S
shm  
Shengliang Guan 已提交
307

S
Shengliang Guan 已提交
308
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
309
  return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
S
shm  
Shengliang Guan 已提交
310 311
}

L
Liu Jicong 已提交
312 313 314 315
int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE);
}

S
Shengliang Guan 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
  int32_t    size = -1;
  SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
  if (pVnode != NULL) {
    switch (qtype) {
      case QUERY_QUEUE:
        size = taosQueueSize(pVnode->pQueryQ);
        break;
      case FETCH_QUEUE:
        size = taosQueueSize(pVnode->pFetchQ);
        break;
      case WRITE_QUEUE:
        size = taosQueueSize(pVnode->pWriteQ);
        break;
      case SYNC_QUEUE:
        size = taosQueueSize(pVnode->pSyncQ);
        break;
      case APPLY_QUEUE:
        size = taosQueueSize(pVnode->pApplyQ);
        break;
L
Liu Jicong 已提交
336 337 338
      case MERGE_QUEUE:
        size = taosQueueSize(pVnode->pMergeQ);
        break;
S
Shengliang Guan 已提交
339 340 341 342 343 344 345 346
      default:
        break;
    }
  }
  vmReleaseVnode(pWrapper->pMgmt, pVnode);
  return size;
}

S
shm  
Shengliang Guan 已提交
347 348 349
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
L
Liu Jicong 已提交
350
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
S
shm  
Shengliang Guan 已提交
351
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
352
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
353
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
shm  
Shengliang Guan 已提交
354 355

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
L
fix  
Liu Jicong 已提交
356
      pVnode->pQueryQ == NULL || pVnode->pMergeQ == NULL) {
S
shm  
Shengliang Guan 已提交
357 358 359 360
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
361
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
362 363 364
  return 0;
}

S
shm  
Shengliang Guan 已提交
365
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
366
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
367
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
368 369
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
L
Liu Jicong 已提交
370
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
S
shm  
Shengliang Guan 已提交
371 372 373 374 375 376
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
  pVnode->pSyncQ = NULL;
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
L
Liu Jicong 已提交
377
  pVnode->pMergeQ = NULL;
S
Shengliang Guan 已提交
378
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
379 380
}

S
shm  
Shengliang Guan 已提交
381 382 383 384 385 386 387
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
  int32_t maxFetchThreads = 4;
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);
L
fix  
Liu Jicong 已提交
388
  int32_t maxMergeThreads = 1;
S
shm  
Shengliang Guan 已提交
389 390 391 392 393 394 395

  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
396
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
397 398 399
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
S
Shengliang Guan 已提交
400
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
401 402 403 404 405 406 407 408 409 410 411

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

L
fix  
Liu Jicong 已提交
412 413 414 415 416
  pWPool = &pMgmt->mergePool;
  pWPool->name = "vnode-merge";
  pWPool->max = maxMergeThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
417
  SSingleWorkerCfg cfg = {
S
shm  
Shengliang Guan 已提交
418
      .min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
S
Shengliang Guan 已提交
419
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
420
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
421 422 423
    return -1;
  }

S
Shengliang Guan 已提交
424
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
425 426 427 428
  return 0;
}

void vmStopWorker(SVnodesMgmt *pMgmt) {
S
Shengliang Guan 已提交
429
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
430
  tQWorkerCleanup(&pMgmt->fetchPool);
S
shm  
Shengliang Guan 已提交
431 432 433
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
L
fix  
Liu Jicong 已提交
434
  tWWorkerCleanup(&pMgmt->mergePool);
S
Shengliang Guan 已提交
435
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
436
}