vmWorker.c 16.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

19 20
#include "sync.h"
#include "syncTools.h"
S
shm  
Shengliang Guan 已提交
21

S
Shengliang Guan 已提交
22
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
23 24
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
25 26
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
S
Shengliang Guan 已提交
27
      .info = pMsg->info,
S
Shengliang Guan 已提交
28
  };
S
Shengliang Guan 已提交
29
  tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
30 31
}

S
Shengliang Guan 已提交
32
static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang 已提交
33
  SVnodeMgmt *pMgmt = pInfo->ahandle;
S
Shengliang Guan 已提交
34 35
  int32_t     code = -1;
  dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
36

S
Shengliang Guan 已提交
37
  switch (pMsg->msgType) {
38
    case TDMT_MON_VM_INFO:
S
Shengliang 已提交
39
      code = vmProcessGetMonitorInfoReq(pMgmt, pMsg);
40 41
      break;
    case TDMT_MON_VM_LOAD:
S
Shengliang 已提交
42
      code = vmProcessGetLoadsReq(pMgmt, pMsg);
43
      break;
S
Shengliang Guan 已提交
44 45 46 47 48 49 50 51
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
Shengliang Guan 已提交
52
      dError("msg:%p, not processed in vnode queue", pMsg);
S
Shengliang Guan 已提交
53 54
  }

S
Shengliang Guan 已提交
55
  if (IsReq(pMsg)) {
S
Shengliang Guan 已提交
56
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
57
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
58 59
  }

S
Shengliang Guan 已提交
60
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
61
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
62 63 64
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
65
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
66 67
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
68
  dTrace("msg:%p, get from vnode-query queue", pMsg);
S
Shengliang Guan 已提交
69
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
S
Shengliang Guan 已提交
70
  if (code != 0) {
S
Shengliang Guan 已提交
71 72
    if (terrno != 0) code = terrno;
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
73
  }
dengyihao's avatar
dengyihao 已提交
74 75 76
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
77
}
S
shm  
Shengliang Guan 已提交
78

S
Shengliang Guan 已提交
79
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
80 81
  SVnodeObj *pVnode = pInfo->ahandle;

S
Shengliang Guan 已提交
82
  dTrace("msg:%p, get from vnode-fetch queue", pMsg);
S
Shengliang Guan 已提交
83
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
S
Shengliang Guan 已提交
84
  if (code != 0) {
S
Shengliang Guan 已提交
85 86
    if (terrno != 0) code = terrno;
    vmSendRsp(pMsg, code);
S
Shengliang Guan 已提交
87
  }
dengyihao's avatar
dengyihao 已提交
88 89 90
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
91 92
}

S
Shengliang Guan 已提交
93 94
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
S
Shengliang Guan 已提交
95
  SArray    *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
S
Shengliang Guan 已提交
96 97 98 99
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
100 101

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
102
    SRpcMsg *pMsg = NULL;
S
Shengliang Guan 已提交
103 104
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

S
Shengliang Guan 已提交
105
    dTrace("msg:%p, get from vnode-write queue", pMsg);
S
Shengliang Guan 已提交
106 107
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
S
Shengliang Guan 已提交
108
      vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
S
Shengliang Guan 已提交
109
    }
S
shm  
Shengliang Guan 已提交
110 111
  }

M
Minghao Li 已提交
112
  for (int i = 0; i < taosArrayGetSize(pArray); i++) {
S
Shengliang Guan 已提交
113
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
114
    SRpcMsg  rsp = {.info = pMsg->info};
M
Minghao Li 已提交
115

H
Hongze Cheng 已提交
116 117
    vnodePreprocessReq(pVnode->pImpl, pMsg);

S
Shengliang Guan 已提交
118
    int32_t ret = syncPropose(vnodeGetSyncHandle(pVnode->pImpl), pMsg, false);
M
Minghao Li 已提交
119
    if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
S
Shengliang Guan 已提交
120
      dTrace("msg:%p, is redirect since not leader, vgId:%d ", pMsg, pVnode->vgId);
M
Minghao Li 已提交
121 122 123
      rsp.code = TSDB_CODE_RPC_REDIRECT;
      SEpSet newEpSet;
      syncGetEpSet(vnodeGetSyncHandle(pVnode->pImpl), &newEpSet);
M
Minghao Li 已提交
124
      newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
M
Minghao Li 已提交
125
      tmsgSendRedirectRsp(&rsp, &newEpSet);
M
Minghao Li 已提交
126
    } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
M
Minghao Li 已提交
127
      rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
H
Hongze Cheng 已提交
128
      tmsgSendRsp(&rsp);
M
Minghao Li 已提交
129 130
    } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) {
      // send response in applyQ
S
shm  
Shengliang Guan 已提交
131
    } else {
M
Minghao Li 已提交
132
      assert(0);
S
shm  
Shengliang Guan 已提交
133 134 135
    }
  }

S
Shengliang Guan 已提交
136
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
Shengliang Guan 已提交
137
    SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
S
shm  
Shengliang Guan 已提交
138
    dTrace("msg:%p, is freed", pMsg);
S
Shengliang Guan 已提交
139
    rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
140
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
141 142 143 144 145
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
146 147
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
S
shm  
Shengliang Guan 已提交
148 149

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
150
    SRpcMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
151 152
    taosGetQitem(qall, (void **)&pMsg);

153
    // init response rpc msg
S
Shengliang Guan 已提交
154
    SRpcMsg rsp = {0};
155 156

    // get original rpc msg
S
Shengliang Guan 已提交
157 158
    assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
159 160 161 162
    syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
    SRpcMsg originalRpcMsg;
    syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);

163
    // apply data into tsdb
164
    if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
M
Minghao Li 已提交
165
      rsp.code = terrno;
S
Shengliang Guan 已提交
166
      dTrace("msg:%p, process write error since %s", pMsg, terrstr());
M
Minghao Li 已提交
167 168
    }

169 170 171
    syncApplyMsgDestroy(pSyncApplyMsg);
    rpcFreeCont(originalRpcMsg.pCont);

172
    // if leader, send response
173
    if (pMsg->info.handle != NULL) {
S
Shengliang Guan 已提交
174
      rsp.info = pMsg->info;
M
Minghao Li 已提交
175
      tmsgSendRsp(&rsp);
H
Hongze Cheng 已提交
176
    }
M
Minghao Li 已提交
177

S
Shengliang Guan 已提交
178
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
179
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
180 181 182
  }
}

S
Shengliang Guan 已提交
183 184
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;
S
shm  
Shengliang Guan 已提交
185 186

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
187
    SRpcMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
188 189
    taosGetQitem(qall, (void **)&pMsg);

S
Shengliang Guan 已提交
190 191
    int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL);
    if (code != 0) {
192
      if (pMsg->info.handle != NULL) {
S
Shengliang Guan 已提交
193 194 195 196 197
        SRpcMsg rsp = {
            .code = (terrno < 0) ? terrno : code,
            .info = pMsg->info,
        };
        dTrace("msg:%p, failed to process sync queue since %s", pMsg, terrstr());
198 199 200
        tmsgSendRsp(&rsp);
      }
    }
M
Minghao Li 已提交
201

S
Shengliang Guan 已提交
202
    rpcFreeCont(pMsg->pCont);
M
Minghao Li 已提交
203
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
204 205 206
  }
}

L
Liu Jicong 已提交
207 208 209 210
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
  SVnodeObj *pVnode = pInfo->ahandle;

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
Shengliang Guan 已提交
211
    SRpcMsg *pMsg = NULL;
L
Liu Jicong 已提交
212 213
    taosGetQitem(qall, (void **)&pMsg);

S
Shengliang Guan 已提交
214
    dTrace("msg:%p, get from vnode-merge queue", pMsg);
S
Shengliang Guan 已提交
215
    int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
L
Liu Jicong 已提交
216
    if (code != 0) {
S
Shengliang Guan 已提交
217 218
      if (terrno != 0) code = terrno;
      vmSendRsp(pMsg, code);
L
Liu Jicong 已提交
219
    }
dengyihao's avatar
dengyihao 已提交
220 221 222
    dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
    rpcFreeCont(pMsg->pCont);
    taosFreeQitem(pMsg);
L
Liu Jicong 已提交
223 224 225
  }
}

S
Shengliang Guan 已提交
226
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
227
  SMsgHead *pHead = pMsg->pCont;
S
Shengliang Guan 已提交
228 229
  int32_t   code = 0;

D
dapan1121 已提交
230 231
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);
S
shm  
Shengliang Guan 已提交
232 233 234

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
235
    dError("vgId:%d, failed to put msg:%p into vnode-queue since %s", pHead->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
236
    return terrno != 0 ? terrno : -1;
S
shm  
Shengliang Guan 已提交
237 238
  }

S
Shengliang Guan 已提交
239
  switch (qtype) {
S
Shengliang Guan 已提交
240
    case QUERY_QUEUE:
S
Shengliang Guan 已提交
241
      dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
242
      taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
243
      break;
S
Shengliang Guan 已提交
244
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
245
      dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
246
      taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
247
      break;
S
Shengliang Guan 已提交
248
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
249
      dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
250
      taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
251 252
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
253
      dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
254
      taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
255
      break;
L
fix  
Liu Jicong 已提交
256
    case MERGE_QUEUE:
S
Shengliang Guan 已提交
257
      dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
258
      taosWriteQitem(pVnode->pMergeQ, pMsg);
L
fix  
Liu Jicong 已提交
259
      break;
S
Shengliang Guan 已提交
260
    default:
S
Shengliang Guan 已提交
261
      code = -1;
S
Shengliang Guan 已提交
262 263 264
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
265 266 267

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
268 269
}

S
Shengliang Guan 已提交
270
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
271 272
  return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
S
shm  
Shengliang Guan 已提交
273

S
Shengliang Guan 已提交
274
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
275 276
  return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
S
shm  
Shengliang Guan 已提交
277

S
Shengliang Guan 已提交
278
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
279 280
  return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
S
shm  
Shengliang Guan 已提交
281

S
Shengliang Guan 已提交
282
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
283 284
  return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
285

S
Shengliang Guan 已提交
286
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
287 288
  return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
L
Liu Jicong 已提交
289

S
Shengliang Guan 已提交
290
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
291
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
292
  dTrace("msg:%p, put into vnode-mgmt worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
293 294
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
295 296
}

S
Shengliang Guan 已提交
297
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
298
  SSingleWorker *pWorker = &pMgmt->monitorWorker;
S
Shengliang Guan 已提交
299
  dTrace("msg:%p, put into vnode-monitor worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
300 301 302 303
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

S
Shengliang 已提交
304
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
S
Shengliang Guan 已提交
305
  SMsgHead  *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
306 307 308
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
Shengliang Guan 已提交
309
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
310
  int32_t  code = 0;
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312 313 314
  if (pMsg == NULL) {
    rpcFreeCont(pRpc->pCont);
    pRpc->pCont = NULL;
S
Shengliang Guan 已提交
315
    code = -1;
S
Shengliang Guan 已提交
316
  } else {
S
Shengliang Guan 已提交
317
    memcpy(pMsg, pRpc, sizeof(SRpcMsg));
S
Shengliang Guan 已提交
318
    switch (qtype) {
L
Liu Jicong 已提交
319
      case WRITE_QUEUE:
S
Shengliang Guan 已提交
320
        dTrace("msg:%p, create and put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
L
Liu Jicong 已提交
321 322
        taosWriteQitem(pVnode->pWriteQ, pMsg);
        break;
S
Shengliang Guan 已提交
323
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
324
        dTrace("msg:%p, create and put into vnode-query queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
325
        taosWriteQitem(pVnode->pQueryQ, pMsg);
S
Shengliang Guan 已提交
326
        break;
S
Shengliang Guan 已提交
327
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
328
        dTrace("msg:%p, create and put into vnode-fetch queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
329
        taosWriteQitem(pVnode->pFetchQ, pMsg);
S
Shengliang Guan 已提交
330
        break;
S
Shengliang Guan 已提交
331
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
332
        dTrace("msg:%p, create and put into vnode-apply queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
333
        taosWriteQitem(pVnode->pApplyQ, pMsg);
S
Shengliang Guan 已提交
334
        break;
L
Liu Jicong 已提交
335
      case MERGE_QUEUE:
S
Shengliang Guan 已提交
336
        dTrace("msg:%p, create and put into vnode-merge queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
Shengliang Guan 已提交
337
        taosWriteQitem(pVnode->pMergeQ, pMsg);
L
Liu Jicong 已提交
338
        break;
S
Shengliang Guan 已提交
339
      case SYNC_QUEUE:
S
Shengliang Guan 已提交
340
        dTrace("msg:%p, create and put into vnode-sync queue, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
M
Minghao Li 已提交
341 342
        taosWriteQitem(pVnode->pSyncQ, pMsg);
        break;
S
Shengliang Guan 已提交
343
      default:
S
Shengliang Guan 已提交
344
        code = -1;
S
Shengliang Guan 已提交
345 346 347
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
348
  }
S
Shengliang Guan 已提交
349

S
shm  
Shengliang Guan 已提交
350 351 352 353
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang 已提交
354 355
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, WRITE_QUEUE);
L
Liu Jicong 已提交
356 357
}

S
Shengliang 已提交
358
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pMgmt, pRpc, SYNC_QUEUE); }
S
Shengliang Guan 已提交
359

S
Shengliang 已提交
360 361
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, APPLY_QUEUE);
S
Shengliang Guan 已提交
362 363
}

S
Shengliang 已提交
364 365
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
366
}
S
shm  
Shengliang Guan 已提交
367

S
Shengliang 已提交
368 369
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
370
}
S
shm  
Shengliang Guan 已提交
371

S
Shengliang 已提交
372 373
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pMgmt, pRpc, MERGE_QUEUE);
L
Liu Jicong 已提交
374 375
}

S
Shengliang 已提交
376
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
S
Shengliang Guan 已提交
377
  int32_t    size = -1;
S
Shengliang 已提交
378
  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
S
Shengliang Guan 已提交
379 380 381
  if (pVnode != NULL) {
    switch (qtype) {
      case WRITE_QUEUE:
382
        size = taosQueueItemSize(pVnode->pWriteQ);
S
Shengliang Guan 已提交
383 384
        break;
      case SYNC_QUEUE:
385
        size = taosQueueItemSize(pVnode->pSyncQ);
S
Shengliang Guan 已提交
386 387
        break;
      case APPLY_QUEUE:
388
        size = taosQueueItemSize(pVnode->pApplyQ);
S
Shengliang Guan 已提交
389
        break;
S
Shengliang Guan 已提交
390
      case QUERY_QUEUE:
391
        size = taosQueueItemSize(pVnode->pQueryQ);
S
Shengliang Guan 已提交
392 393
        break;
      case FETCH_QUEUE:
394
        size = taosQueueItemSize(pVnode->pFetchQ);
S
Shengliang Guan 已提交
395
        break;
L
Liu Jicong 已提交
396
      case MERGE_QUEUE:
397
        size = taosQueueItemSize(pVnode->pMergeQ);
L
Liu Jicong 已提交
398
        break;
S
Shengliang Guan 已提交
399 400 401 402
      default:
        break;
    }
  }
S
Shengliang 已提交
403
  vmReleaseVnode(pMgmt, pVnode);
S
Shengliang Guan 已提交
404 405 406
  return size;
}

S
Shengliang 已提交
407
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
408 409
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
410
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
S
shm  
Shengliang Guan 已提交
411
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
Shengliang Guan 已提交
412 413
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
  pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
S
shm  
Shengliang Guan 已提交
414

S
Shengliang Guan 已提交
415 416
  if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL ||
      pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) {
S
shm  
Shengliang Guan 已提交
417 418 419 420
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
421
  dDebug("vgId:%d, queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
422 423 424
  return 0;
}

S
Shengliang 已提交
425
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
426
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
S
Shengliang Guan 已提交
427
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
S
shm  
Shengliang Guan 已提交
428
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
S
Shengliang Guan 已提交
429 430
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
L
Liu Jicong 已提交
431
  tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ);
S
shm  
Shengliang Guan 已提交
432 433
  pVnode->pWriteQ = NULL;
  pVnode->pSyncQ = NULL;
S
Shengliang Guan 已提交
434
  pVnode->pApplyQ = NULL;
S
shm  
Shengliang Guan 已提交
435
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
436
  pVnode->pFetchQ = NULL;
L
Liu Jicong 已提交
437
  pVnode->pMergeQ = NULL;
S
Shengliang Guan 已提交
438
  dDebug("vgId:%d, queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
439 440
}

S
Shengliang 已提交
441
int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
442 443
  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
S
Shengliang Guan 已提交
444 445
  pQPool->min = tsNumOfVnodeQueryThreads;
  pQPool->max = tsNumOfVnodeQueryThreads;
S
shm  
Shengliang Guan 已提交
446 447
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
448
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
449
  pFPool->name = "vnode-fetch";
S
Shengliang Guan 已提交
450 451
  pFPool->min = tsNumOfVnodeFetchThreads;
  pFPool->max = tsNumOfVnodeFetchThreads;
S
Shengliang Guan 已提交
452
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
453 454 455

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
S
Shengliang Guan 已提交
456
  pWPool->max = tsNumOfVnodeWriteThreads;
S
shm  
Shengliang Guan 已提交
457 458
  if (tWWorkerInit(pWPool) != 0) return -1;

S
Shengliang Guan 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472
  SWWorkerPool *pSPool = &pMgmt->syncPool;
  pSPool->name = "vnode-sync";
  pSPool->max = tsNumOfVnodeSyncThreads;
  if (tWWorkerInit(pSPool) != 0) return -1;

  SWWorkerPool *pMPool = &pMgmt->mergePool;
  pMPool->name = "vnode-merge";
  pMPool->max = tsNumOfVnodeMergeThreads;
  if (tWWorkerInit(pMPool) != 0) return -1;

  SSingleWorkerCfg cfg = {
      .min = 1,
      .max = 1,
      .name = "vnode-mgmt",
S
Shengliang Guan 已提交
473
      .fp = (FItem)vmProcessQueue,
S
Shengliang Guan 已提交
474 475
      .param = pMgmt,
  };
S
Shengliang Guan 已提交
476
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
477
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
478 479 480
    return -1;
  }

S
Shengliang Guan 已提交
481 482 483 484
  SSingleWorkerCfg mCfg = {
      .min = 1,
      .max = 1,
      .name = "vnode-monitor",
S
Shengliang Guan 已提交
485
      .fp = (FItem)vmProcessQueue,
S
Shengliang Guan 已提交
486 487 488
      .param = pMgmt,
  };
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
S
Shengliang Guan 已提交
489
    dError("failed to start vnode-monitor worker since %s", terrstr());
S
Shengliang Guan 已提交
490
    return -1;
491 492
  }

S
Shengliang Guan 已提交
493
  dDebug("vnode workers are initialized");
S
shm  
Shengliang Guan 已提交
494 495 496
  return 0;
}

S
Shengliang 已提交
497
void vmStopWorker(SVnodeMgmt *pMgmt) {
498
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
S
Shengliang Guan 已提交
499
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
shm  
Shengliang Guan 已提交
500 501
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
S
Shengliang Guan 已提交
502 503
  tQWorkerCleanup(&pMgmt->queryPool);
  tQWorkerCleanup(&pMgmt->fetchPool);
L
fix  
Liu Jicong 已提交
504
  tWWorkerCleanup(&pMgmt->mergePool);
S
Shengliang Guan 已提交
505
  dDebug("vnode workers are closed");
S
shm  
Shengliang Guan 已提交
506
}