vmWorker.c 14.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20
  if (pMsg->info.handle == NULL) return;
S
Shengliang Guan 已提交
21 22
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
23 24
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
S
Shengliang Guan 已提交
25
      .info = pMsg->info,
S
Shengliang Guan 已提交
26
  };
S
Shengliang Guan 已提交
27
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
28 29
}

S
Shengliang Guan 已提交
30
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
31 32 33
  SVnodeMgmt     *pMgmt = pInfo->ahandle;
  int32_t         code = -1;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
34

dengyihao's avatar
dengyihao 已提交
35
  dGTrace("msg:%p, get from vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
36
  switch (pMsg->msgType) {
S
Shengliang Guan 已提交
37 38 39 40 41 42
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
S
Shengliang Guan 已提交
43 44 45
    case TDMT_VND_ALTER_REPLICA:
      code = vmProcessAlterVnodeReq(pMgmt, pMsg);
      break;
S
Shengliang Guan 已提交
46 47
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
48
      dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
49 50
  }

S
Shengliang Guan 已提交
51
  if (IsReq(pMsg)) {
52 53
    if (code != 0) {
      if (terrno != 0) code = terrno;
C
Cary Xu 已提交
54
      dGError("msg:%p, failed to process since %s", pMsg, terrstr(code));
S
Shengliang Guan 已提交
55
    }
S
Shengliang Guan 已提交
56
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
57 58
  }

S
Shengliang Guan 已提交
59
  dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
60
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
61 62 63
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
64
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
65 66
  SVnodeObj      *pVnode = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
67

S
Shengliang Guan 已提交
68
  dGTrace("vgId:%d, msg:%p get from vnode-query queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
69
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
S
Shengliang Guan 已提交
70
  if (code != 0) {
S
Shengliang Guan 已提交
71
    if (terrno != 0) code = terrno;
C
Cary Xu 已提交
72
    dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code));
S
Shengliang Guan 已提交
73
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
74
  }
S
Shengliang Guan 已提交
75

S
Shengliang Guan 已提交
76
  dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
77 78
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
79
}
S
shm  
Shengliang Guan 已提交
80

S
Shengliang Guan 已提交
81
static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
82 83
  SVnodeObj      *pVnode = pInfo->ahandle;
  const STraceId *trace = &pMsg->info.traceId;
S
Shengliang Guan 已提交
84

S
Shengliang Guan 已提交
85
  dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
86
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
S
Shengliang Guan 已提交
87
  if (code != 0) {
S
Shengliang Guan 已提交
88
    if (terrno != 0) code = terrno;
C
Cary Xu 已提交
89
    dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code));
S
Shengliang Guan 已提交
90
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
91
  }
S
Shengliang Guan 已提交
92

S
Shengliang Guan 已提交
93
  dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dengyihao's avatar
dengyihao 已提交
94 95
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
96 97
}

98 99 100
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SRpcMsg   *pMsg = NULL;
S
Shengliang Guan 已提交
101

102 103 104 105
  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    const STraceId *trace = &pMsg->info.traceId;
    dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
106

107 108 109
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
    if (code != 0) {
      if (terrno != 0) code = terrno;
C
Cary Xu 已提交
110
      dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code));
111 112 113 114 115 116 117
      vmSendRsp(pMsg, code);
    }

    dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
S
Shengliang Guan 已提交
118 119
}

S
Shengliang Guan 已提交
120 121
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
L
Liu Jicong 已提交
122
  SRpcMsg   *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
123 124

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
125
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
S
Shengliang Guan 已提交
126 127
    const STraceId *trace = &pMsg->info.traceId;
    dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
S
shm  
Shengliang Guan 已提交
128

129
    int32_t code = vnodeProcessSyncMsg(pVnode->pImpl, pMsg, NULL);  // no response here
S
Shengliang Guan 已提交
130
    dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
S
Shengliang Guan 已提交
131
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
132
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
133 134 135
  }
}

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
  SRpcMsg   *pMsg = NULL;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
    const STraceId *trace = &pMsg->info.traceId;
    dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);

    int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL);  // no response here
    dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

S
Shengliang Guan 已提交
152
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
153 154 155
  const STraceId *trace = &pMsg->info.traceId;
  SMsgHead       *pHead = pMsg->pCont;
  int32_t         code = 0;
S
Shengliang Guan 已提交
156

D
dapan1121 已提交
157 158
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
159 160 161

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
162 163
    dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(),
            TMSG_INFO(pMsg->msgType), qtype);
S
Shengliang Guan 已提交
164
    return terrno != 0 ? terrno : -1;
S
shm  
Shengliang Guan 已提交
165 166
  }

S
Shengliang Guan 已提交
167
  switch (qtype) {
S
Shengliang Guan 已提交
168
    case QUERY_QUEUE:
C
Cary Xu 已提交
169 170 171
      if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
        terrno = TSDB_CODE_GRANT_EXPIRED;
        code = terrno;
C
Cary Xu 已提交
172
        dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
C
Cary Xu 已提交
173 174 175 176 177
      } else {
        vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
        dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
        taosWriteQitem(pVnode->pQueryQ, pMsg);
      }
S
Shengliang Guan 已提交
178
      break;
S
Shengliang Guan 已提交
179 180
    case STREAM_QUEUE:
      dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
L
Liu Jicong 已提交
181 182 183 184 185
      if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
        vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg);
      } else {
        taosWriteQitem(pVnode->pStreamQ, pMsg);
      }
S
Shengliang Guan 已提交
186
      break;
S
Shengliang Guan 已提交
187
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
188
      dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
189
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
190
      break;
S
Shengliang Guan 已提交
191
    case WRITE_QUEUE:
wafwerar's avatar
wafwerar 已提交
192 193 194
      if (!osDataSpaceAvailable()) {
        terrno = TSDB_CODE_VND_NO_DISKSPACE;
        code = terrno;
C
Cary Xu 已提交
195
        dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
wafwerar's avatar
wafwerar 已提交
196
      } else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
C
Cary Xu 已提交
197 198
        terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
        code = terrno;
C
Cary Xu 已提交
199
        dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
C
Cary Xu 已提交
200 201 202
      } else {
        dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
        taosWriteQitem(pVnode->pWriteQ, pMsg);
203 204 205 206 207 208 209 210 211 212
#if 0  // tests for batch writes
        if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
          SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
          memcpy(pDup, pMsg, sizeof(SRpcMsg));
          pDup->pCont = rpcMallocCont(pMsg->contLen);
          memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
          pDup->info.handle = NULL;
          taosWriteQitem(pVnode->pWriteQ, pDup);
        }
#endif
C
Cary Xu 已提交
213
      }
S
Shengliang Guan 已提交
214 215
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
216
      dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
217
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
218
      break;
219 220 221 222
    case SYNC_CTRL_QUEUE:
      dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
      taosWriteQitem(pVnode->pSyncCtrlQ, pMsg);
      break;
S
Shengliang Guan 已提交
223
    case APPLY_QUEUE:
S
Shengliang Guan 已提交
224
      dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
S
Shengliang Guan 已提交
225 226
      taosWriteQitem(pVnode->pApplyQ, pMsg);
      break;
S
Shengliang Guan 已提交
227
    default:
S
Shengliang Guan 已提交
228
      code = -1;
S
Shengliang Guan 已提交
229 230 231
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
232 233 234

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
235 236
}

237
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); }
S
shm  
Shengliang Guan 已提交
238

S
Shengliang Guan 已提交
239
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
240

S
Shengliang Guan 已提交
241
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
S
shm  
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
S
shm  
Shengliang Guan 已提交
244

S
Shengliang Guan 已提交
245
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
S
shm  
Shengliang Guan 已提交
246

S
Shengliang Guan 已提交
247 248
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }

S
Shengliang Guan 已提交
249
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
250 251
  const STraceId *trace = &pMsg->info.traceId;
  dGTrace("msg:%p, put into vnode-mgmt queue", pMsg);
S
Shengliang Guan 已提交
252
  taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
S
Shengliang Guan 已提交
253
  return 0;
S
shm  
Shengliang Guan 已提交
254 255
}

S
Shengliang Guan 已提交
256
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
257
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
258
  if (pMsg == NULL) {
dengyihao's avatar
dengyihao 已提交
259
    rpcFreeCont(pRpc->pCont);
S
Shengliang Guan 已提交
260 261 262
    pRpc->pCont = NULL;
    return -1;
  }
S
Shengliang Guan 已提交
263

S
Shengliang Guan 已提交
264
  SMsgHead *pHead = pRpc->pCont;
265
  dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
266

S
Shengliang Guan 已提交
267 268 269
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
S
Shengliang Guan 已提交
270 271 272 273 274 275

  int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
  if (code != 0) {
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->pCont);
    pRpc->pCont = NULL;
276
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
277 278 279
  }

  return code;
S
shm  
Shengliang Guan 已提交
280 281
}

S
Shengliang 已提交
282
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
S
Shengliang Guan 已提交
283
  int32_t    size = -1;
S
Shengliang 已提交
284
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
S
Shengliang Guan 已提交
285 286 287
  if (pVnode != NULL) {
    switch (qtype) {
      case WRITE_QUEUE:
288
        size = taosQueueItemSize(pVnode->pWriteQ);
S
Shengliang Guan 已提交
289 290
        break;
      case SYNC_QUEUE:
291
        size = taosQueueItemSize(pVnode->pSyncQ);
S
Shengliang Guan 已提交
292 293
        break;
      case APPLY_QUEUE:
294
        size = taosQueueItemSize(pVnode->pApplyQ);
S
Shengliang Guan 已提交
295
        break;
S
Shengliang Guan 已提交
296
      case QUERY_QUEUE:
297
        size = taosQueueItemSize(pVnode->pQueryQ);
S
Shengliang Guan 已提交
298 299
        break;
      case FETCH_QUEUE:
300
        size = taosQueueItemSize(pVnode->pFetchQ);
S
Shengliang Guan 已提交
301
        break;
S
Shengliang Guan 已提交
302 303 304
      case STREAM_QUEUE:
        size = taosQueueItemSize(pVnode->pStreamQ);
        break;
S
Shengliang Guan 已提交
305 306 307
      default:
        break;
    }
308
    vmReleaseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
309
  }
S
Shengliang Guan 已提交
310 311 312 313
  if (size < 0) {
    dError("vgId:%d, can't get size from queue since %s, qtype:%d", vgId, terrstr(), qtype);
    size = 0;
  }
S
Shengliang Guan 已提交
314 315 316
  return size;
}

S
Shengliang 已提交
317
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
318
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
S
shm  
Shengliang Guan 已提交
319
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
320
  pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue);
321
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
S
shm  
Shengliang Guan 已提交
322
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
Shengliang Guan 已提交
323
  pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
324
  pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
325

S
Shengliang Guan 已提交
326
  if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
S
Shengliang Guan 已提交
327
      pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
S
shm  
Shengliang Guan 已提交
328 329 330 331
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

332 333 334 335
  dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ);
  dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ);
  dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ);
  dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ);
S
Shengliang Guan 已提交
336
  dDebug("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
337
  dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
338 339 340
  return 0;
}

S
Shengliang 已提交
341
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
342
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
S
Shengliang Guan 已提交
343
  tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
344
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
345
  tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
S
Shengliang Guan 已提交
346
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
347
  tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
348
  tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
349 350
  pVnode->pWriteQ = NULL;
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
351
  pVnode->pApplyQ = NULL;
S
shm  
Shengliang Guan 已提交
352
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
353
  pVnode->pStreamQ = NULL;
S
Shengliang Guan 已提交
354
  pVnode->pFetchQ = NULL;
S
Shengliang Guan 已提交
355
  dDebug("vgId:%d, queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
356 357
}

S
Shengliang 已提交
358
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
359 360
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
361 362
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
363 364
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
365 366 367 368 369 370
  SQWorkerPool *pStreamPool = &pMgmt->streamPool;
  pStreamPool->name = "vnode-stream";
  pStreamPool->min = tsNumOfVnodeStreamThreads;
  pStreamPool->max = tsNumOfVnodeStreamThreads;
  if (tQWorkerInit(pStreamPool) != 0) return -1;

371
  SWWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
372
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
373
  pFPool->max = tsNumOfVnodeFetchThreads;
374
  if (tWWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
375 376 377

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
S
Shengliang Guan 已提交
378
  pWPool->max = tsNumOfVnodeWriteThreads;
S
shm  
Shengliang Guan 已提交
379 380
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
381 382 383 384 385
  SWWorkerPool *pAPool = &pMgmt->applyPool;
  pAPool->name = "vnode-apply";
  pAPool->max = tsNumOfVnodeWriteThreads;
  if (tWWorkerInit(pAPool) != 0) return -1;

S
Shengliang Guan 已提交
386 387 388 389 390
  SWWorkerPool *pSPool = &pMgmt->syncPool;
  pSPool->name = "vnode-sync";
  pSPool->max = tsNumOfVnodeSyncThreads;
  if (tWWorkerInit(pSPool) != 0) return -1;

391 392 393 394 395
  SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
  pSCPool->name = "vnode-sync-ctrl";
  pSCPool->max = tsNumOfVnodeSyncThreads;
  if (tWWorkerInit(pSCPool) != 0) return -1;

S
Shengliang Guan 已提交
396
  SSingleWorkerCfg mgmtCfg = {
S
Shengliang Guan 已提交
397 398 399
      .min = 1,
      .max = 1,
      .name = "vnode-mgmt",
S
Shengliang Guan 已提交
400
      .fp = (FItem)vmProcessMgmtQueue,
S
Shengliang Guan 已提交
401 402
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
403
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1;
S
shm  
Shengliang Guan 已提交
404

S
Shengliang Guan 已提交
405
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
406 407 408
  return 0;
}

S
Shengliang 已提交
409
void vmStopWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
410
  tWWorkerCleanup(&pMgmt->writePool);
S
Shengliang Guan 已提交
411
  tWWorkerCleanup(&pMgmt->applyPool);
S
shm  
Shengliang Guan 已提交
412
  tWWorkerCleanup(&pMgmt->syncPool);
413
  tWWorkerCleanup(&pMgmt->syncCtrlPool);
S
Shengliang Guan 已提交
414
  tQWorkerCleanup(&pMgmt->queryPool);
S
Shengliang Guan 已提交
415
  tQWorkerCleanup(&pMgmt->streamPool);
416
  tWWorkerCleanup(&pMgmt->fetchPool);
S
Shengliang Guan 已提交
417
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
418
}