vmWorker.c 14.9 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20
  if (pMsg->info.handle == NULL) return;
S
Shengliang Guan 已提交
21 22
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
23 24
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
S
Shengliang Guan 已提交
25
      .info = pMsg->info,
S
Shengliang Guan 已提交
26
  };
S
Shengliang Guan 已提交
27
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
28 29
}

S
Shengliang Guan 已提交
30
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
31 32 33
  SVnodeMgmt     *pMgmt = pInfo->ahandle;
  int32_t         code = -1;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
34

dengyihao's avatar
dengyihao 已提交
35
  dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
36
  switch (pMsg->msgType) {
S
Shengliang Guan 已提交
37 38 39 40 41 42
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
S
Shengliang Guan 已提交
43
    case TDMT_VND_ALTER_REPLICA:
44
      code = vmProcessAlterVnodeReplicaReq(pMgmt, pMsg);
S
Shengliang Guan 已提交
45
      break;
46 47 48
    case TDMT_VND_DISABLE_WRITE:
      code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg);
      break;
49 50 51
    case TDMT_VND_ALTER_HASHRANGE:
      code = vmProcessAlterHashRangeReq(pMgmt, pMsg);
      break;
C
cadem 已提交
52 53 54
    case TDMT_DND_ALTER_VNODE_TYPE:
      code = vmProcessAlterVnodeTypeReq(pMgmt, pMsg);
      break;
S
Shengliang Guan 已提交
55 56
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
57
      dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
58 59
  }

S
Shengliang Guan 已提交
60
  if (IsReq(pMsg)) {
61 62
    if (code != 0) {
      if (terrno != 0) code = terrno;
B
Benguang Zhao 已提交
63
      dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
64
    }
S
Shengliang Guan 已提交
65
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
66 67
  }

S
Shengliang Guan 已提交
68
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
69
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
70 71 72
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
73
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
74 75
  SVnodeObj      *pVnode = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
76

S
Shengliang Guan 已提交
77
  dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
78
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
S
Shengliang Guan 已提交
79
  if (code != 0) {
S
Shengliang Guan 已提交
80
    if (terrno != 0) code = terrno;
wmmhello's avatar
wmmhello 已提交
81
    dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code));
S
Shengliang Guan 已提交
82
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
83
  }
S
Shengliang Guan 已提交
84

S
Shengliang Guan 已提交
85
  dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
86 87
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
88
}
S
shm  
Shengliang Guan 已提交
89

S
Shengliang Guan 已提交
90
static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
91 92
  SVnodeObj      *pVnode = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
93

S
Shengliang Guan 已提交
94
  dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
95
  int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
S
Shengliang Guan 已提交
96
  if (code != 0) {
S
Shengliang Guan 已提交
97
    if (terrno != 0) code = terrno;
L
Liu Jicong 已提交
98 99
    dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
            terrstr(code));
S
Shengliang Guan 已提交
100
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
101
  }
S
Shengliang Guan 已提交
102

S
Shengliang Guan 已提交
103
  dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
104 105
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
106 107
}

108 109 110
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SRpcMsg   *pMsg = NULL;
S
Shengliang Guan 已提交
111

112 113 114 115
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    const STraceId *trace = &pMsg->info.traceId;
    dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
116

117
    terrno = 0;
118 119
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
    if (code != 0) {
120
      if (code == -1 && terrno != 0) {
H
Haojun Liao 已提交
121 122 123 124 125 126 127 128 129
        code = terrno;
      }

      if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
        dGDebug("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
      } else {
        dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
      }

130 131 132
      vmSendRsp(pMsg, code);
    }

133
    dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
134 135 136
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
S
Shengliang Guan 已提交
137 138
}

S
Shengliang Guan 已提交
139 140
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
L
Liu Jicong 已提交
141
  SRpcMsg   *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
142 143

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
144
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
S
Shengliang Guan 已提交
145 146
    const STraceId *trace = &pMsg->info.traceId;
    dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
S
shm  
Shengliang Guan 已提交
147

148
    int32_t code = vnodeProcessSyncMsg(pVnode->pImpl, pMsg, NULL);  // no response here
S
Shengliang Guan 已提交
149
    dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
S
Shengliang Guan 已提交
150
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
151
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
152 153 154
  }
}

155 156 157 158 159 160 161
static void vmSendResponse(SRpcMsg *pMsg) {
  if (pMsg->info.handle) {
    SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
    rpcSendResponse(&rsp);
  }
}

S
Shengliang Guan 已提交
162
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
163
  const STraceId *trace = &pMsg->info.traceId;
164 165 166 167 168 169
  if (pMsg->contLen < sizeof(SMsgHead)) {
    dGError("invalid rpc msg with no msg head at pCont. pMsg:%p, type:%s, contLen:%d", pMsg, TMSG_INFO(pMsg->msgType),
            pMsg->contLen);
    return -1;
  }

L
Liu Jicong 已提交
170 171
  SMsgHead *pHead = pMsg->pCont;
  int32_t   code = 0;
S
Shengliang Guan 已提交
172

D
dapan1121 已提交
173 174
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
175 176 177

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
178 179
    dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
           terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
180 181
    terrno = (terrno != 0) ? terrno : -1;
    return terrno;
S
shm  
Shengliang Guan 已提交
182 183
  }

S
Shengliang Guan 已提交
184
  switch (qtype) {
S
Shengliang Guan 已提交
185
    case QUERY_QUEUE:
186 187 188 189 190 191 192
      code = vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
      if (code) {
        dError("vgId:%d, msg:%p preprocess query msg failed since %s", pVnode->vgId, pMsg, terrstr(code));
      } else {
        dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
        taosWriteQitem(pVnode->pQueryQ, pMsg);
      }
S
Shengliang Guan 已提交
193
      break;
S
Shengliang Guan 已提交
194 195
    case STREAM_QUEUE:
      dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
L
Liu Jicong 已提交
196 197 198 199 200
      if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
        vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg);
      } else {
        taosWriteQitem(pVnode->pStreamQ, pMsg);
      }
S
Shengliang Guan 已提交
201
      break;
S
Shengliang Guan 已提交
202
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
203
      dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
204
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
205
      break;
S
Shengliang Guan 已提交
206
    case WRITE_QUEUE:
207 208
      if (!osDataSpaceSufficient()) {
        terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE;
wafwerar's avatar
wafwerar 已提交
209
        code = terrno;
C
Cary Xu 已提交
210
        dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
211 212 213
        break;
      }
      if (pMsg->msgType == TDMT_VND_SUBMIT && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
C
Cary Xu 已提交
214 215
        terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
        code = terrno;
C
Cary Xu 已提交
216
        dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
217 218 219 220 221
        break;
      }
      if (pMsg->msgType != TDMT_VND_ALTER_CONFIRM && pVnode->disable) {
        dDebug("vgId:%d, msg:%p put into vnode-write queue failed since its disable", pVnode->vgId, pMsg);
        terrno = TSDB_CODE_VND_STOPPED;
222
        code = terrno;
223
        break;
C
Cary Xu 已提交
224
      }
225 226
      dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
      taosWriteQitem(pVnode->pWriteW.queue, pMsg);
S
Shengliang Guan 已提交
227 228
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
229
      dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
230
      taosWriteQitem(pVnode->pSyncW.queue, pMsg);
S
Shengliang Guan 已提交
231
      break;
232 233 234
    case SYNC_RD_QUEUE:
      dGTrace("vgId:%d, msg:%p put into vnode-sync-rd queue", pVnode->vgId, pMsg);
      taosWriteQitem(pVnode->pSyncRdW.queue, pMsg);
235
      break;
S
Shengliang Guan 已提交
236
    case APPLY_QUEUE:
S
Shengliang Guan 已提交
237
      dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
238
      taosWriteQitem(pVnode->pApplyW.queue, pMsg);
S
Shengliang Guan 已提交
239
      break;
S
Shengliang Guan 已提交
240
    default:
S
Shengliang Guan 已提交
241
      code = -1;
S
Shengliang Guan 已提交
242 243 244
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
245 246 247

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
248 249
}

250
int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_RD_QUEUE); }
S
shm  
Shengliang Guan 已提交
251

S
Shengliang Guan 已提交
252
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
253

S
Shengliang Guan 已提交
254
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
S
shm  
Shengliang Guan 已提交
255

S
Shengliang Guan 已提交
256
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
S
shm  
Shengliang Guan 已提交
257

S
Shengliang Guan 已提交
258
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
S
shm  
Shengliang Guan 已提交
259

S
Shengliang Guan 已提交
260 261
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }

S
Shengliang Guan 已提交
262
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
263 264
  const STraceId *trace = &pMsg->info.traceId;
  dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
265
  taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
S
Shengliang Guan 已提交
266
  return 0;
S
shm  
Shengliang Guan 已提交
267 268
}

S
Shengliang Guan 已提交
269
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
270
  if (pRpc->contLen < sizeof(SMsgHead)) {
271
    dError("invalid rpc msg with no msg head at pCont. pRpc:%p, type:%s, len:%d", pRpc, TMSG_INFO(pRpc->msgType),
272 273 274 275 276 277
           pRpc->contLen);
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
    return -1;
  }

S
Shengliang Guan 已提交
278
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
S
Shengliang Guan 已提交
279
  if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
280
    rpcFreeCont(pRpc->pCont);
S
Shengliang Guan 已提交
281 282 283
    pRpc->pCont = NULL;
    return -1;
  }
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285
  SMsgHead *pHead = pRpc->pCont;
S
Shengliang Guan 已提交
286
  dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen);
S
Shengliang Guan 已提交
287

S
Shengliang Guan 已提交
288 289 290
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
291
  pRpc->pCont = NULL;
S
Shengliang Guan 已提交
292 293 294 295 296

  int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
  if (code != 0) {
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->pCont);
297
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
298 299 300
  }

  return code;
S
shm  
Shengliang Guan 已提交
301 302
}

S
Shengliang 已提交
303
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
S
Shengliang Guan 已提交
304
  int32_t    size = -1;
S
Shengliang 已提交
305
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
S
Shengliang Guan 已提交
306 307 308
  if (pVnode != NULL) {
    switch (qtype) {
      case WRITE_QUEUE:
309
        size = taosQueueItemSize(pVnode->pWriteW.queue);
S
Shengliang Guan 已提交
310 311
        break;
      case SYNC_QUEUE:
312
        size = taosQueueItemSize(pVnode->pSyncW.queue);
S
Shengliang Guan 已提交
313 314
        break;
      case APPLY_QUEUE:
315
        size = taosQueueItemSize(pVnode->pApplyW.queue);
S
Shengliang Guan 已提交
316
        break;
S
Shengliang Guan 已提交
317
      case QUERY_QUEUE:
318
        size = taosQueueItemSize(pVnode->pQueryQ);
S
Shengliang Guan 已提交
319 320
        break;
      case FETCH_QUEUE:
321
        size = taosQueueItemSize(pVnode->pFetchQ);
S
Shengliang Guan 已提交
322
        break;
S
Shengliang Guan 已提交
323 324 325
      case STREAM_QUEUE:
        size = taosQueueItemSize(pVnode->pStreamQ);
        break;
S
Shengliang Guan 已提交
326 327 328
      default:
        break;
    }
329
    vmReleaseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
330
  }
S
Shengliang Guan 已提交
331
  if (size < 0) {
S
Shengliang Guan 已提交
332
    dTrace("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype);
S
Shengliang Guan 已提交
333 334
    size = 0;
  }
S
Shengliang Guan 已提交
335 336 337
  return size;
}

S
Shengliang 已提交
338
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
339 340
  SMultiWorkerCfg wcfg = {.max = 1, .name = "vnode-write", .fp = (FItems)vnodeProposeWriteMsg, .param = pVnode->pImpl};
  SMultiWorkerCfg scfg = {.max = 1, .name = "vnode-sync", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
341
  SMultiWorkerCfg sccfg = {.max = 1, .name = "vnode-sync-rd", .fp = (FItems)vmProcessSyncQueue, .param = pVnode};
342
  SMultiWorkerCfg acfg = {.max = 1, .name = "vnode-apply", .fp = (FItems)vnodeApplyWriteMsg, .param = pVnode->pImpl};
343 344
  (void)tMultiWorkerInit(&pVnode->pWriteW, &wcfg);
  (void)tMultiWorkerInit(&pVnode->pSyncW, &scfg);
345
  (void)tMultiWorkerInit(&pVnode->pSyncRdW, &sccfg);
346 347
  (void)tMultiWorkerInit(&pVnode->pApplyW, &acfg);

S
shm  
Shengliang Guan 已提交
348
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
349
  pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
350
  pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
351

352
  if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
353
      pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
S
shm  
Shengliang Guan 已提交
354 355 356 357
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

358 359 360 361
  dInfo("vgId:%d, write-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pWriteW.queue,
        pVnode->pWriteW.queue->threadId);
  dInfo("vgId:%d, sync-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncW.queue,
        pVnode->pSyncW.queue->threadId);
362 363
  dInfo("vgId:%d, sync-rd-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pSyncRdW.queue,
        pVnode->pSyncRdW.queue->threadId);
364 365
  dInfo("vgId:%d, apply-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pApplyW.queue,
        pVnode->pApplyW.queue->threadId);
366 367 368 369
  dInfo("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
  dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
        pVnode->pFetchQ->threadId);
  dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
S
shm  
Shengliang Guan 已提交
370 371 372
  return 0;
}

S
Shengliang 已提交
373
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
Shengliang Guan 已提交
374
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
375
  tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
376
  tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
377
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
378
  pVnode->pStreamQ = NULL;
S
Shengliang Guan 已提交
379
  pVnode->pFetchQ = NULL;
S
Shengliang Guan 已提交
380
  dDebug("vgId:%d, queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
381 382
}

S
Shengliang 已提交
383
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
384 385
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
386 387
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
388 389
  if (tQWorkerInit(pQPool) != 0) return -1;

390
  SAutoQWorkerPool *pStreamPool = &pMgmt->streamPool;
S
Shengliang Guan 已提交
391
  pStreamPool->name = "vnode-stream";
392 393
  pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
  if (tAutoQWorkerInit(pStreamPool) != 0) return -1;
S
Shengliang Guan 已提交
394

395
  SWWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
396
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
397
  pFPool->max = tsNumOfVnodeFetchThreads;
398
  if (tWWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
399

S
Shengliang Guan 已提交
400
  SSingleWorkerCfg mgmtCfg = {
S
Shengliang Guan 已提交
401 402 403
      .min = 1,
      .max = 1,
      .name = "vnode-mgmt",
S
Shengliang Guan 已提交
404
      .fp = (FItem)vmProcessMgmtQueue,
S
Shengliang Guan 已提交
405 406
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
407
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
S
shm  
Shengliang Guan 已提交
408

S
Shengliang Guan 已提交
409
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
410 411 412
  return 0;
}

S
Shengliang 已提交
413
void vmStopWorker(SVnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
414
  tQWorkerCleanup(&pMgmt->queryPool);
415
  tAutoQWorkerCleanup(&pMgmt->streamPool);
416
  tWWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
417
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
418
}