vmWorker.c 11.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
  dndSendRsp(pWrapper, &rsp);
}

S
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
  dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);

  switch (msgType) {
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_ALTER_VNODE:
      code = vmProcessAlterVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_SYNC_VNODE:
      code = vmProcessSyncVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_COMPACT_VNODE:
      code = vmProcessCompactVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
52
    vmSendRsp(pMgmt->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
53 54 55 56 57 58 59
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
shm  
Shengliang Guan 已提交
60
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
61 62 63 64 65 66 67 68 69
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
70
}
S
shm  
Shengliang Guan 已提交
71

S
shm  
Shengliang Guan 已提交
72
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
73 74 75 76 77
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
  }
S
shm  
Shengliang Guan 已提交
78

S
Shengliang Guan 已提交
79 80 81
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
82 83
}

S
shm  
Shengliang Guan 已提交
84
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
85
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
86 87 88 89
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
90 91

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
92
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
93 94 95 96 97 98 99
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
      vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
    }
S
shm  
Shengliang Guan 已提交
100 101 102 103
  }

  vnodeProcessWMsgs(pVnode->pImpl, pArray);

S
Shengliang Guan 已提交
104 105
  numOfMsgs = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
106 107
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
108 109 110
    SRpcMsg  *pRsp = NULL;

    int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
S
shm  
Shengliang Guan 已提交
111
    if (pRsp != NULL) {
S
shm  
Shengliang Guan 已提交
112
      pRsp->ahandle = pRpc->ahandle;
S
shm  
Shengliang Guan 已提交
113
      dndSendRsp(pVnode->pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
114 115
      free(pRsp);
    } else {
S
Shengliang Guan 已提交
116
      if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
117
      vmSendRsp(pVnode->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
118 119 120
    }
  }

S
Shengliang Guan 已提交
121
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
122 123 124 125
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
126 127 128 129 130
  }

  taosArrayDestroy(pArray);
}

S
shm  
Shengliang Guan 已提交
131
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
132
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
133 134 135 136 137 138

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
139
    (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
140 141 142
  }
}

S
shm  
Shengliang Guan 已提交
143
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
144
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
145 146 147 148 149 150

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
151
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
152 153 154
  }
}

S
Shengliang Guan 已提交
155 156 157
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) {
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
158

S
Shengliang Guan 已提交
159
  SMsgHead *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
160 161 162 163 164
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
S
Shengliang Guan 已提交
165 166
    dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
    return -1;
S
shm  
Shengliang Guan 已提交
167 168
  }

S
Shengliang Guan 已提交
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
  switch (qtype) {
    case VND_QUERY_QUEUE:
      dTrace("msg:%p, will be written into vnode-query queue", pMsg);
      code = taosWriteQitem(pVnode->pQueryQ, pMsg);
      break;
    case VND_FETCH_QUEUE:
      dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
      code = taosWriteQitem(pVnode->pFetchQ, pMsg);
      break;
    case VND_WRITE_QUEUE:
      dTrace("msg:%p, will be written into vnode-write queue", pMsg);
      code = taosWriteQitem(pVnode->pWriteQ, pMsg);
    case VND_SYNC_QUEUE:
      dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
      code = taosWriteQitem(pVnode->pSyncQ, pMsg);
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
188 189 190

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
191 192
}

S
shm  
Shengliang Guan 已提交
193
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
194 195
  return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE);
}
S
shm  
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197 198
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
  return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE);
S
shm  
Shengliang Guan 已提交
199 200
}

S
shm  
Shengliang Guan 已提交
201
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
202
  return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE);
S
shm  
Shengliang Guan 已提交
203 204
}

S
shm  
Shengliang Guan 已提交
205
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
206 207
  return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
208

S
Shengliang Guan 已提交
209 210 211 212
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
  SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
  dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
  return dndWriteMsgToWorker(pWorker, pMsg);
S
shm  
Shengliang Guan 已提交
213 214
}

S
Shengliang Guan 已提交
215
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) {
S
shm  
Shengliang Guan 已提交
216
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
217 218
  int32_t      code = -1;
  SMsgHead    *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
219 220 221 222

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
223 224
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg != NULL) {
S
Shengliang Guan 已提交
225
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
226
    pMsg->rpcMsg = *pRpc;
S
Shengliang Guan 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
    switch (qtype) {
      case VND_QUERY_QUEUE:
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
        code = taosWriteQitem(pVnode->pQueryQ, pMsg);
        break;
      case VND_FETCH_QUEUE:
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
        code = taosWriteQitem(pVnode->pFetchQ, pMsg);
        break;
      case VND_APPLY_QUEUE:
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
        code = taosWriteQitem(pVnode->pApplyQ, pMsg);
        break;
      case VND_WRITE_QUEUE:
      case VND_SYNC_QUEUE:
      default:
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
246
  }
S
shm  
Shengliang Guan 已提交
247 248 249 250
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang Guan 已提交
251 252 253
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE);
}
S
shm  
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255 256 257
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE);
}
S
shm  
Shengliang Guan 已提交
258

S
Shengliang Guan 已提交
259 260
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE);
S
shm  
Shengliang Guan 已提交
261 262
}

S
shm  
Shengliang Guan 已提交
263 264 265 266 267 268
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
  pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
shm  
Shengliang Guan 已提交
269 270 271 272 273 274 275

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
276
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
277 278 279
  return 0;
}

S
shm  
Shengliang Guan 已提交
280
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
281 282 283 284 285 286 287 288 289 290
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
  tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
  pVnode->pSyncQ = NULL;
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
291
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
292 293
}

S
shm  
Shengliang Guan 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
  int32_t maxFetchThreads = 4;
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);

  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

  SFWorkerPool *pFPool = &pMgmt->fetchPool;
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
  if (tFWorkerInit(pFPool) != 0) return -1;

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

S
shm  
Shengliang Guan 已提交
324
  if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
S
Shengliang Guan 已提交
325
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
326 327 328
    return -1;
  }

S
shm  
Shengliang Guan 已提交
329 330 331 332 333
  dDebug("vnode workers is initialized");
  return 0;
}

void vmStopWorker(SVnodesMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
334
  dndCleanupWorker(&pMgmt->mgmtWorker);
S
shm  
Shengliang Guan 已提交
335 336 337 338 339 340
  tFWorkerCleanup(&pMgmt->fetchPool);
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
  dDebug("vnode workers is closed");
}