vmWorker.c 14.6 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

19 20 21
static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
                 .ahandle = pMsg->rpcMsg.ahandle,
dengyihao's avatar
dengyihao 已提交
22
                 .refId = pMsg->rpcMsg.refId,
23 24 25
                 .code = code,
                 .pCont = pMsg->pRsp,
                 .contLen = pMsg->rspLen};
S
Shengliang Guan 已提交
26
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
27 28
}

S
Shengliang Guan 已提交
29 30 31
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
32 33
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
34
  dTrace("msg:%p, will be processed in vnode-m queue", pMsg);
S
Shengliang Guan 已提交
35 36

  switch (msgType) {
37 38 39 40 41 42
    case TDMT_MON_VM_INFO:
      code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg);
      break;
    case TDMT_MON_VM_LOAD:
      code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg);
      break;
S
Shengliang Guan 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
56
    vmSendRsp(pMgmt->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
57 58 59 60 61 62 63
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
64 65 66
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
67 68 69 70
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
71 72 73
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
74
  }
S
shm  
Shengliang Guan 已提交
75
}
S
shm  
Shengliang Guan 已提交
76

S
Shengliang Guan 已提交
77 78 79
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
80
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
L
Liu Jicong 已提交
81
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
S
Shengliang Guan 已提交
82 83
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
84 85 86
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
87
  }
S
Shengliang Guan 已提交
88 89
}

S
Shengliang Guan 已提交
90 91
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
H
Hongze Cheng 已提交
92
  int64_t    version;
S
Shengliang Guan 已提交
93

S
shm  
Shengliang Guan 已提交
94
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
95 96 97 98
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
99 100

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
101
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
102 103 104 105 106 107 108
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
      vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
    }
S
shm  
Shengliang Guan 已提交
109 110
  }

H
Hongze Cheng 已提交
111
  vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
S
shm  
Shengliang Guan 已提交
112

S
Shengliang Guan 已提交
113 114
  numOfMsgs = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
115
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dengyihao's avatar
dengyihao 已提交
116
    SRpcMsg * pRpc = &pMsg->rpcMsg;
H
Hongze Cheng 已提交
117
    SRpcMsg   rsp;
S
Shengliang Guan 已提交
118

H
Hongze Cheng 已提交
119 120 121 122 123
    rsp.pCont = NULL;
    rsp.contLen = 0;
    rsp.code = 0;
    rsp.handle = pRpc->handle;
    rsp.ahandle = pRpc->ahandle;
dengyihao's avatar
dengyihao 已提交
124
    rsp.refId = pRpc->refId;
H
Hongze Cheng 已提交
125 126 127 128 129

    int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
    tmsgSendRsp(&rsp);

#if 0
S
shm  
Shengliang Guan 已提交
130
    if (pRsp != NULL) {
S
shm  
Shengliang Guan 已提交
131
      pRsp->ahandle = pRpc->ahandle;
wafwerar's avatar
wafwerar 已提交
132
      taosMemoryFree(pRsp);
S
shm  
Shengliang Guan 已提交
133
    } else {
S
Shengliang Guan 已提交
134
      if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
135
      vmSendRsp(pVnode->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
136
    }
H
Hongze Cheng 已提交
137
#endif
S
shm  
Shengliang Guan 已提交
138 139
  }

S
Shengliang Guan 已提交
140
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
141 142 143 144
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
145 146 147 148 149
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
150 151
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
dengyihao's avatar
dengyihao 已提交
152
  SNodeMsg * pMsg = NULL;
S
shm  
Shengliang Guan 已提交
153 154 155 156 157 158

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
H
Hongze Cheng 已提交
159
    // (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
160 161 162
  }
}

S
Shengliang Guan 已提交
163 164
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
dengyihao's avatar
dengyihao 已提交
165
  SNodeMsg * pMsg = NULL;
S
shm  
Shengliang Guan 已提交
166 167 168 169 170 171

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
172
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
173 174 175
  }
}

L
Liu Jicong 已提交
176 177
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
dengyihao's avatar
dengyihao 已提交
178
  SNodeMsg * pMsg = NULL;
L
Liu Jicong 已提交
179 180 181 182 183

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
L
Liu Jicong 已提交
184
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193
    if (code != 0) {
      vmSendRsp(pVnode->pWrapper, pMsg, code);
      dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
      rpcFreeCont(pMsg->rpcMsg.pCont);
      taosFreeQitem(pMsg);
    }
  }
}

S
Shengliang Guan 已提交
194
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
dengyihao's avatar
dengyihao 已提交
195
  SRpcMsg * pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
196
  SMsgHead *pHead = pRpc->pCont;
D
dapan1121 已提交
197 198
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
199 200 201

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
202
    dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
H
Haojun Liao 已提交
203
    return terrno;
S
shm  
Shengliang Guan 已提交
204 205
  }

S
Shengliang Guan 已提交
206
  int32_t code = 0;
S
Shengliang Guan 已提交
207
  switch (qtype) {
S
Shengliang Guan 已提交
208
    case QUERY_QUEUE:
S
Shengliang Guan 已提交
209
      dTrace("msg:%p, will be written into vnode-query queue", pMsg);
S
Shengliang Guan 已提交
210
      taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
211
      break;
S
Shengliang Guan 已提交
212
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
213
      dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
214
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
215
      break;
S
Shengliang Guan 已提交
216
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
217
      dTrace("msg:%p, will be written into vnode-write queue", pMsg);
S
Shengliang Guan 已提交
218
      taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
219 220
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
221
      dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
S
Shengliang Guan 已提交
222
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
223
      break;
L
fix  
Liu Jicong 已提交
224 225
    case MERGE_QUEUE:
      dTrace("msg:%p, will be written into vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
226
      taosWriteQitem(pVnode->pMergeQ, pMsg);
L
fix  
Liu Jicong 已提交
227
      break;
S
Shengliang Guan 已提交
228
    default:
S
Shengliang Guan 已提交
229
      code = -1;
S
Shengliang Guan 已提交
230 231 232
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
233 234 235

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
236 237
}

S
Shengliang Guan 已提交
238 239 240 241
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
S
shm  
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243 244 245 246
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
S
shm  
Shengliang Guan 已提交
247

S
Shengliang Guan 已提交
248 249 250 251
int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
S
shm  
Shengliang Guan 已提交
252

S
Shengliang Guan 已提交
253 254 255 256
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
257

S
Shengliang Guan 已提交
258 259 260 261
int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
  return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
L
Liu Jicong 已提交
262

S
Shengliang Guan 已提交
263
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
264
  SVnodesMgmt *  pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
265
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
266
  dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
267 268
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
269 270
}

271
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dengyihao's avatar
dengyihao 已提交
272
  SVnodesMgmt *  pMgmt = pWrapper->pMgmt;
273 274 275 276 277 278 279
  SSingleWorker *pWorker = &pMgmt->monitorWorker;

  dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

S
Shengliang Guan 已提交
280
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
S
shm  
Shengliang Guan 已提交
281
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
dengyihao's avatar
dengyihao 已提交
282
  SMsgHead *   pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
283 284 285 286

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
287
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
S
Shengliang Guan 已提交
288 289 290 291 292
  int32_t   code = 0;

  if (pMsg == NULL) {
    code = -1;
  } else {
S
Shengliang Guan 已提交
293
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
294
    pMsg->rpcMsg = *pRpc;
S
Shengliang Guan 已提交
295
    switch (qtype) {
S
Shengliang Guan 已提交
296
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
297
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
S
Shengliang Guan 已提交
298
        taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
299
        break;
S
Shengliang Guan 已提交
300
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
301
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
302
        taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
303
        break;
S
Shengliang Guan 已提交
304
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
305
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
S
Shengliang Guan 已提交
306
        taosWriteQitem(pVnode->pApplyQ, pMsg);
S
Shengliang Guan 已提交
307
        break;
L
Liu Jicong 已提交
308 309
      case MERGE_QUEUE:
        dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
310
        taosWriteQitem(pVnode->pMergeQ, pMsg);
L
Liu Jicong 已提交
311
        break;
S
Shengliang Guan 已提交
312
      default:
S
Shengliang Guan 已提交
313
        code = -1;
S
Shengliang Guan 已提交
314 315 316
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
317
  }
S
shm  
Shengliang Guan 已提交
318 319 320 321
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang Guan 已提交
322
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
323
  return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
324
}
S
shm  
Shengliang Guan 已提交
325

S
Shengliang Guan 已提交
326
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
327
  return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
328
}
S
shm  
Shengliang Guan 已提交
329

S
Shengliang Guan 已提交
330
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
331
  return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
S
shm  
Shengliang Guan 已提交
332 333
}

L
Liu Jicong 已提交
334 335 336 337
int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE);
}

S
Shengliang Guan 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
  int32_t    size = -1;
  SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
  if (pVnode != NULL) {
    switch (qtype) {
      case QUERY_QUEUE:
        size = taosQueueSize(pVnode->pQueryQ);
        break;
      case FETCH_QUEUE:
        size = taosQueueSize(pVnode->pFetchQ);
        break;
      case WRITE_QUEUE:
        size = taosQueueSize(pVnode->pWriteQ);
        break;
      case SYNC_QUEUE:
        size = taosQueueSize(pVnode->pSyncQ);
        break;
      case APPLY_QUEUE:
        size = taosQueueSize(pVnode->pApplyQ);
        break;
L
Liu Jicong 已提交
358 359 360
      case MERGE_QUEUE:
        size = taosQueueSize(pVnode->pMergeQ);
        break;
S
Shengliang Guan 已提交
361 362 363 364 365 366 367 368
      default:
        break;
    }
  }
  vmReleaseVnode(pWrapper->pMgmt, pVnode);
  return size;
}

S
shm  
Shengliang Guan 已提交
369 370 371
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
L
Liu Jicong 已提交
372
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
S
shm  
Shengliang Guan 已提交
373
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
374
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
375
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
shm  
Shengliang Guan 已提交
376 377

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
L
fix  
Liu Jicong 已提交
378
      pVnode->pQueryQ == NULL || pVnode->pMergeQ == NULL) {
S
shm  
Shengliang Guan 已提交
379 380 381 382
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
383
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
384 385 386
  return 0;
}

S
shm  
Shengliang Guan 已提交
387
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
388
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
389
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
390 391
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
L
Liu Jicong 已提交
392
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
S
shm  
Shengliang Guan 已提交
393 394 395 396 397 398
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
  pVnode->pSyncQ = NULL;
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
L
Liu Jicong 已提交
399
  pVnode->pMergeQ = NULL;
S
Shengliang Guan 已提交
400
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
401 402
}

S
shm  
Shengliang Guan 已提交
403 404 405
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
406 407
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
408 409
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
410
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
411
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
412 413
  pFPool->min = tsNumOfVnodeFetchThreads;
  pFPool->max = tsNumOfVnodeFetchThreads;
S
Shengliang Guan 已提交
414
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
415 416 417

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
S
Shengliang Guan 已提交
418
  pWPool->max = tsNumOfVnodeWriteThreads;
S
shm  
Shengliang Guan 已提交
419 420 421 422
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
S
Shengliang Guan 已提交
423
  pWPool->max = tsNumOfVnodeSyncThreads;
S
shm  
Shengliang Guan 已提交
424 425
  if (tWWorkerInit(pWPool) != 0) return -1;

L
fix  
Liu Jicong 已提交
426 427
  pWPool = &pMgmt->mergePool;
  pWPool->name = "vnode-merge";
S
Shengliang Guan 已提交
428
  pWPool->max = tsNumOfVnodeMergeThreads;
L
fix  
Liu Jicong 已提交
429 430
  if (tWWorkerInit(pWPool) != 0) return -1;

L
Liu Jicong 已提交
431
  SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
S
Shengliang Guan 已提交
432
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
433
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
434 435 436
    return -1;
  }

437
  if (tsMultiProcess) {
438
    SSingleWorkerCfg mCfg = {
439
        .min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
440
    if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
441 442 443 444 445
      dError("failed to start mnode vnode-monitor worker since %s", terrstr());
      return -1;
    }
  }

S
Shengliang Guan 已提交
446
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
447 448 449 450
  return 0;
}

void vmStopWorker(SVnodesMgmt *pMgmt) {
451
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
S
Shengliang Guan 已提交
452
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
453
  tQWorkerCleanup(&pMgmt->fetchPool);
S
shm  
Shengliang Guan 已提交
454 455 456
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
L
fix  
Liu Jicong 已提交
457
  tWWorkerCleanup(&pMgmt->mergePool);
S
Shengliang Guan 已提交
458
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
459
}