vmWorker.c 16.5 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

19 20
#include "sync.h"
#include "syncTools.h"
S
shm  
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22 23 24 25 26 27 28 29 30
static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {
      .handle = pMsg->rpcMsg.handle,
      .ahandle = pMsg->rpcMsg.ahandle,
      .refId = pMsg->rpcMsg.refId,
      .code = code,
      .pCont = pMsg->pRsp,
      .contLen = pMsg->rspLen,
  };
S
Shengliang Guan 已提交
31
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
32 33
}

S
Shengliang Guan 已提交
34
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
S
Shengliang 已提交
35
  SVnodeMgmt *pMgmt = pInfo->ahandle;
S
Shengliang Guan 已提交
36

S
Shengliang Guan 已提交
37 38
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
S
Shengliang Guan 已提交
39
  dTrace("msg:%p, will be processed in vnode-mgmt/monitor queue", pMsg);
S
Shengliang Guan 已提交
40 41

  switch (msgType) {
42
    case TDMT_MON_VM_INFO:
S
Shengliang 已提交
43
      code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
44 45
      break;
    case TDMT_MON_VM_LOAD:
S
Shengliang 已提交
46
      code = vmProcessGetLoadsReq(pMgmt, pMsg);
47
      break;
S
Shengliang Guan 已提交
48 49 50 51 52 53 54 55
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
56
      dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg);
S
Shengliang Guan 已提交
57 58 59 60
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
61
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
62 63 64 65 66 67 68
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
69 70 71
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
72 73 74
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
S
Shengliang Guan 已提交
75 76 77
    if (terrno != 0) code = terrno;
    vmSendRsp(pMsg, code);

S
Shengliang Guan 已提交
78 79 80
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
81
  }
S
shm  
Shengliang Guan 已提交
82
}
S
shm  
Shengliang Guan 已提交
83

S
Shengliang Guan 已提交
84 85 86
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
87
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
L
Liu Jicong 已提交
88
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
S
Shengliang Guan 已提交
89
  if (code != 0) {
S
Shengliang Guan 已提交
90 91 92
    if (terrno != 0) code = terrno;
    vmSendRsp(pMsg, code);

S
Shengliang Guan 已提交
93 94 95
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
96
  }
S
Shengliang Guan 已提交
97 98
}

S
Shengliang Guan 已提交
99 100
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
H
Hongze Cheng 已提交
101
  SRpcMsg    rsp;
S
Shengliang Guan 已提交
102

S
shm  
Shengliang Guan 已提交
103
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
104 105 106 107
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
108 109

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
110
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
111 112 113 114 115
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
S
Shengliang Guan 已提交
116
      vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
S
Shengliang Guan 已提交
117
    }
S
shm  
Shengliang Guan 已提交
118 119
  }

M
Minghao Li 已提交
120 121
  for (int i = 0; i < taosArrayGetSize(pArray); i++) {
    SNodeMsg *pMsg;
M
Minghao Li 已提交
122
    SRpcMsg  *pRpc;
H
Hongze Cheng 已提交
123

M
Minghao Li 已提交
124 125 126
    pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    pRpc = &pMsg->rpcMsg;

H
Hongze Cheng 已提交
127 128
    rsp.ahandle = pRpc->ahandle;
    rsp.handle = pRpc->handle;
dengyihao's avatar
dengyihao 已提交
129
    rsp.refId = pRpc->refId;
H
Hongze Cheng 已提交
130 131
    rsp.pCont = NULL;
    rsp.contLen = 0;
M
Minghao Li 已提交
132

H
Hongze Cheng 已提交
133
    int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pRpc, false);
M
Minghao Li 已提交
134
    if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
M
Minghao Li 已提交
135 136
      // rsp.code = TSDB_CODE_SYN_NOT_LEADER;
      // tmsgSendRsp(&rsp);
M
Minghao Li 已提交
137
      dTrace("syncPropose not leader redirect, vgId:%d ", syncGetVgId(vnodeGetSyncHandle(pVnode->pImpl)));
M
Minghao Li 已提交
138 139 140
      rsp.code = TSDB_CODE_RPC_REDIRECT;
      SEpSet newEpSet;
      syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
M
Minghao Li 已提交
141
      newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
M
Minghao Li 已提交
142
      tmsgSendRedirectRsp(&rsp, &newEpSet);
M
Minghao Li 已提交
143

M
Minghao Li 已提交
144
    } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
M
Minghao Li 已提交
145
      rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
H
Hongze Cheng 已提交
146
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
147 148 149
    } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
      // ok
      // send response in applyQ
S
shm  
Shengliang Guan 已提交
150
    } else {
M
Minghao Li 已提交
151
      assert(0);
S
shm  
Shengliang Guan 已提交
152 153 154
    }
  }

S
Shengliang Guan 已提交
155
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
156 157 158 159
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
160 161 162 163 164
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
165 166
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
M
Minghao Li 已提交
167
  SNodeMsg  *pMsg = NULL;
H
Hongze Cheng 已提交
168
  SRpcMsg    rsp;
S
shm  
Shengliang Guan 已提交
169 170 171 172

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

173
    // init response rpc msg
M
Minghao Li 已提交
174 175 176
    rsp.code = 0;
    rsp.pCont = NULL;
    rsp.contLen = 0;
177 178 179 180 181 182 183 184

    // get original rpc msg
    assert(pMsg->rpcMsg.msgType == TDMT_VND_SYNC_APPLY_MSG);
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(&pMsg->rpcMsg);
    syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
    SRpcMsg originalRpcMsg;
    syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);

185
    // apply data into tsdb
186
    if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
M
Minghao Li 已提交
187 188 189 190
      rsp.code = terrno;
      dTrace("vnodeProcessWriteReq error, code:%d", terrno);
    }

191 192 193
    syncApplyMsgDestroy(pSyncApplyMsg);
    rpcFreeCont(originalRpcMsg.pCont);

194
    // if leader, send response
195 196
    //if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
    if (pMsg->rpcMsg.handle != NULL) {
H
Hongze Cheng 已提交
197 198
      rsp.ahandle = pMsg->rpcMsg.ahandle;
      rsp.handle = pMsg->rpcMsg.handle;
dengyihao's avatar
dengyihao 已提交
199
      rsp.refId = pMsg->rpcMsg.refId;
M
Minghao Li 已提交
200
      tmsgSendRsp(&rsp);
H
Hongze Cheng 已提交
201
    }
M
Minghao Li 已提交
202 203 204

    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
205 206 207
  }
}

S
Shengliang Guan 已提交
208 209
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
M
Minghao Li 已提交
210
  SNodeMsg  *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
211 212 213 214 215 216

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
217
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
M
Minghao Li 已提交
218 219 220

    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
221 222 223
  }
}

L
Liu Jicong 已提交
224 225
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
M
Minghao Li 已提交
226
  SNodeMsg  *pMsg = NULL;
L
Liu Jicong 已提交
227 228 229 230 231

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
L
Liu Jicong 已提交
232
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
L
Liu Jicong 已提交
233
    if (code != 0) {
S
Shengliang Guan 已提交
234 235 236
      if (terrno != 0) code = terrno;
      vmSendRsp(pMsg, code);

L
Liu Jicong 已提交
237 238 239 240 241 242 243
      dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
      rpcFreeCont(pMsg->rpcMsg.pCont);
      taosFreeQitem(pMsg);
    }
  }
}

S
Shengliang 已提交
244
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
M
Minghao Li 已提交
245
  SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
246
  SMsgHead *pHead = pRpc->pCont;
S
Shengliang Guan 已提交
247 248
  int32_t   code = 0;

D
dapan1121 已提交
249 250
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
251 252 253

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
254
    dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
255
    return terrno != 0 ? terrno : -1;
S
shm  
Shengliang Guan 已提交
256 257
  }

S
Shengliang Guan 已提交
258
  switch (qtype) {
S
Shengliang Guan 已提交
259
    case QUERY_QUEUE:
260
      dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
261
      taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
262
      break;
S
Shengliang Guan 已提交
263
    case FETCH_QUEUE:
264
      dTrace("msg:%p, type:%s will be written into vnode-fetch queue", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
265
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
266
      break;
S
Shengliang Guan 已提交
267
    case WRITE_QUEUE:
268
      dTrace("msg:%p, type:%s will be written into vnode-write queue", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
269
      taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
270 271
      break;
    case SYNC_QUEUE:
272
      dTrace("msg:%p, type:%s will be written into vnode-sync queue", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
273
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
274
      break;
L
fix  
Liu Jicong 已提交
275
    case MERGE_QUEUE:
276
      dTrace("msg:%p, type:%s will be written into vnode-merge queue", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
277
      taosWriteQitem(pVnode->pMergeQ, pMsg);
L
fix  
Liu Jicong 已提交
278
      break;
S
Shengliang Guan 已提交
279
    default:
S
Shengliang Guan 已提交
280
      code = -1;
S
Shengliang Guan 已提交
281 282 283
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
284 285 286

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
287 288
}

S
Shengliang 已提交
289
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
290 291
  return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
S
shm  
Shengliang Guan 已提交
292

S
Shengliang 已提交
293
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
294 295
  return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
S
shm  
Shengliang Guan 已提交
296

S
Shengliang 已提交
297
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
298 299
  return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
S
shm  
Shengliang Guan 已提交
300

S
Shengliang 已提交
301
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
302 303
  return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
304

S
Shengliang 已提交
305
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
306 307
  return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
L
Liu Jicong 已提交
308

S
Shengliang 已提交
309
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
310
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
311
  dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
312 313
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
314 315
}

S
Shengliang 已提交
316
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
317 318
  SSingleWorker *pWorker = &pMgmt->monitorWorker;

S
Shengliang Guan 已提交
319
  dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name);
320 321 322 323
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

S
Shengliang 已提交
324 325
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
  SMsgHead *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
326 327 328 329

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

330
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
S
Shengliang Guan 已提交
331 332
  int32_t   code = 0;

S
Shengliang Guan 已提交
333
  if (pMsg != NULL) {
S
Shengliang Guan 已提交
334
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
335
    pMsg->rpcMsg = *pRpc;
dengyihao's avatar
dengyihao 已提交
336
    // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
S
Shengliang Guan 已提交
337
    switch (qtype) {
L
Liu Jicong 已提交
338 339 340 341
      case WRITE_QUEUE:
        dTrace("msg:%p, will be put into vnode-write queue", pMsg);
        taosWriteQitem(pVnode->pWriteQ, pMsg);
        break;
S
Shengliang Guan 已提交
342
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
343
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
S
Shengliang Guan 已提交
344
        taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
345
        break;
S
Shengliang Guan 已提交
346
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
347
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
348
        taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
349
        break;
S
Shengliang Guan 已提交
350
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
351
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
S
Shengliang Guan 已提交
352
        taosWriteQitem(pVnode->pApplyQ, pMsg);
S
Shengliang Guan 已提交
353
        break;
L
Liu Jicong 已提交
354 355
      case MERGE_QUEUE:
        dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
356
        taosWriteQitem(pVnode->pMergeQ, pMsg);
L
Liu Jicong 已提交
357
        break;
S
Shengliang Guan 已提交
358
      case SYNC_QUEUE:
M
Minghao Li 已提交
359 360 361
        dTrace("msg:%p, will be put into vnode-sync queue", pMsg);
        taosWriteQitem(pVnode->pSyncQ, pMsg);
        break;
S
Shengliang Guan 已提交
362
      default:
S
Shengliang Guan 已提交
363
        code = -1;
S
Shengliang Guan 已提交
364 365 366
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
367
  }
S
Shengliang Guan 已提交
368

S
shm  
Shengliang Guan 已提交
369 370 371 372
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang 已提交
373 374
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, WRITE_QUEUE);
L
Liu Jicong 已提交
375 376
}

S
Shengliang 已提交
377
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pMgmt, pRpc, SYNC_QUEUE); }
S
Shengliang Guan 已提交
378

S
Shengliang 已提交
379 380
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, APPLY_QUEUE);
S
Shengliang Guan 已提交
381 382
}

S
Shengliang 已提交
383 384
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
385
}
S
shm  
Shengliang Guan 已提交
386

S
Shengliang 已提交
387 388
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
389
}
S
shm  
Shengliang Guan 已提交
390

S
Shengliang 已提交
391 392
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, MERGE_QUEUE);
L
Liu Jicong 已提交
393 394
}

S
Shengliang 已提交
395
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
S
Shengliang Guan 已提交
396
  int32_t    size = -1;
S
Shengliang 已提交
397
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
S
Shengliang Guan 已提交
398 399 400
  if (pVnode != NULL) {
    switch (qtype) {
      case WRITE_QUEUE:
401
        size = taosQueueItemSize(pVnode->pWriteQ);
S
Shengliang Guan 已提交
402 403
        break;
      case SYNC_QUEUE:
404
        size = taosQueueItemSize(pVnode->pSyncQ);
S
Shengliang Guan 已提交
405 406
        break;
      case APPLY_QUEUE:
407
        size = taosQueueItemSize(pVnode->pApplyQ);
S
Shengliang Guan 已提交
408
        break;
S
Shengliang Guan 已提交
409
      case QUERY_QUEUE:
410
        size = taosQueueItemSize(pVnode->pQueryQ);
S
Shengliang Guan 已提交
411 412
        break;
      case FETCH_QUEUE:
413
        size = taosQueueItemSize(pVnode->pFetchQ);
S
Shengliang Guan 已提交
414
        break;
L
Liu Jicong 已提交
415
      case MERGE_QUEUE:
416
        size = taosQueueItemSize(pVnode->pMergeQ);
L
Liu Jicong 已提交
417
        break;
S
Shengliang Guan 已提交
418 419 420 421
      default:
        break;
    }
  }
S
Shengliang 已提交
422
  vmReleaseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
423 424 425
  return size;
}

S
Shengliang 已提交
426
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
427 428
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
429
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
S
shm  
Shengliang Guan 已提交
430
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
Shengliang Guan 已提交
431 432
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
S
shm  
Shengliang Guan 已提交
433

S
Shengliang Guan 已提交
434 435
  if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
      pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) {
S
shm  
Shengliang Guan 已提交
436 437 438 439
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
440
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
441 442 443
  return 0;
}

S
Shengliang 已提交
444
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
445
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
S
Shengliang Guan 已提交
446
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
shm  
Shengliang Guan 已提交
447
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
S
Shengliang Guan 已提交
448 449
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
L
Liu Jicong 已提交
450
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
S
shm  
Shengliang Guan 已提交
451 452
  pVnode->pWriteQ = NULL;
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
453
  pVnode->pApplyQ = NULL;
S
shm  
Shengliang Guan 已提交
454
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
455
  pVnode->pFetchQ = NULL;
L
Liu Jicong 已提交
456
  pVnode->pMergeQ = NULL;
S
Shengliang Guan 已提交
457
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
458 459
}

S
Shengliang 已提交
460
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
461 462
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
463 464
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
465 466
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
467
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
468
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
469 470
  pFPool->min = tsNumOfVnodeFetchThreads;
  pFPool->max = tsNumOfVnodeFetchThreads;
S
Shengliang Guan 已提交
471
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
472 473 474

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
S
Shengliang Guan 已提交
475
  pWPool->max = tsNumOfVnodeWriteThreads;
S
shm  
Shengliang Guan 已提交
476 477
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
  SWWorkerPool *pSPool = &pMgmt->syncPool;
  pSPool->name = "vnode-sync";
  pSPool->max = tsNumOfVnodeSyncThreads;
  if (tWWorkerInit(pSPool) != 0) return -1;

  SWWorkerPool *pMPool = &pMgmt->mergePool;
  pMPool->name = "vnode-merge";
  pMPool->max = tsNumOfVnodeMergeThreads;
  if (tWWorkerInit(pMPool) != 0) return -1;

  SSingleWorkerCfg cfg = {
      .min = 1,
      .max = 1,
      .name = "vnode-mgmt",
      .fp = (FItem)vmProcessMgmtMonitorQueue,
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
495
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
496
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
497 498 499
    return -1;
  }

S
Shengliang Guan 已提交
500 501 502 503 504 505 506 507 508 509
  SSingleWorkerCfg mCfg = {
      .min = 1,
      .max = 1,
      .name = "vnode-monitor",
      .fp = (FItem)vmProcessMgmtMonitorQueue,
      .param = pMgmt,
  };
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
    dError("failed to start mnode vnode-monitor worker since %s", terrstr());
    return -1;
510 511
  }

S
Shengliang Guan 已提交
512
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
513 514 515
  return 0;
}

S
Shengliang 已提交
516
void vmStopWorker(SVnodeMgmt *pMgmt) {
517
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
S
Shengliang Guan 已提交
518
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
shm  
Shengliang Guan 已提交
519 520
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
521 522
  tQWorkerCleanup(&pMgmt->queryPool);
  tQWorkerCleanup(&pMgmt->fetchPool);
L
fix  
Liu Jicong 已提交
523
  tWWorkerCleanup(&pMgmt->mergePool);
S
Shengliang Guan 已提交
524
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
525
}