vmWorker.c 11.0 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "vmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
  dndSendRsp(pWrapper, &rsp);
}

S
Shengliang Guan 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
  int32_t code = -1;
  tmsg_t  msgType = pMsg->rpcMsg.msgType;
  dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);

  switch (msgType) {
    case TDMT_DND_CREATE_VNODE:
      code = vmProcessCreateVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_ALTER_VNODE:
      code = vmProcessAlterVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = vmProcessDropVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_SYNC_VNODE:
      code = vmProcessSyncVnodeReq(pMgmt, pMsg);
      break;
    case TDMT_DND_COMPACT_VNODE:
      code = vmProcessCompactVnodeReq(pMgmt, pMsg);
      break;
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      dError("msg:%p, not processed in vnode-mgmt queue", pMsg);
  }

  if (msgType & 1u) {
    if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
52
    vmSendRsp(pMgmt->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
53 54 55 56 57 58 59
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}

S
shm  
Shengliang Guan 已提交
60
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
61 62 63 64
  dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
  int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
65 66 67
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
68
  }
S
shm  
Shengliang Guan 已提交
69
}
S
shm  
Shengliang Guan 已提交
70

S
shm  
Shengliang Guan 已提交
71
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
72 73 74 75
  dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
  int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
  if (code != 0) {
    vmSendRsp(pVnode->pWrapper, pMsg, code);
S
Shengliang Guan 已提交
76 77 78
    dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
79
  }
S
Shengliang Guan 已提交
80 81
}

S
shm  
Shengliang Guan 已提交
82
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
83
  SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
S
Shengliang Guan 已提交
84 85 86 87
  if (pArray == NULL) {
    dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
    return;
  }
S
shm  
Shengliang Guan 已提交
88 89

  for (int32_t i = 0; i < numOfMsgs; ++i) {
S
shm  
Shengliang Guan 已提交
90
    SNodeMsg *pMsg = NULL;
S
Shengliang Guan 已提交
91 92 93 94 95 96 97
    if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;

    dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
    if (taosArrayPush(pArray, &pMsg) == NULL) {
      dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
      vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
    }
S
shm  
Shengliang Guan 已提交
98 99 100 101
  }

  vnodeProcessWMsgs(pVnode->pImpl, pArray);

S
Shengliang Guan 已提交
102 103
  numOfMsgs = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
104 105
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    SRpcMsg  *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
106 107 108
    SRpcMsg  *pRsp = NULL;

    int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
S
shm  
Shengliang Guan 已提交
109
    if (pRsp != NULL) {
S
shm  
Shengliang Guan 已提交
110
      pRsp->ahandle = pRpc->ahandle;
S
shm  
Shengliang Guan 已提交
111
      dndSendRsp(pVnode->pWrapper, pRsp);
S
shm  
Shengliang Guan 已提交
112 113
      free(pRsp);
    } else {
S
Shengliang Guan 已提交
114
      if (code != 0 && terrno != 0) code = terrno;
S
Shengliang Guan 已提交
115
      vmSendRsp(pVnode->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
116 117 118
    }
  }

S
Shengliang Guan 已提交
119
  for (int32_t i = 0; i < numOfMsgs; i++) {
S
shm  
Shengliang Guan 已提交
120 121 122 123
    SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
    dTrace("msg:%p, is freed", pMsg);
    rpcFreeCont(pMsg->rpcMsg.pCont);
    taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
124 125 126 127 128
  }

  taosArrayDestroy(pArray);
}

S
shm  
Shengliang Guan 已提交
129
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
130
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
131 132 133 134 135 136

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
137
    (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
138 139 140
  }
}

S
shm  
Shengliang Guan 已提交
141
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
S
shm  
Shengliang Guan 已提交
142
  SNodeMsg *pMsg = NULL;
S
shm  
Shengliang Guan 已提交
143 144 145 146 147 148

  for (int32_t i = 0; i < numOfMsgs; ++i) {
    taosGetQitem(qall, (void **)&pMsg);

    // todo
    SRpcMsg *pRsp = NULL;
S
shm  
Shengliang Guan 已提交
149
    (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
S
shm  
Shengliang Guan 已提交
150 151 152
  }
}

S
Shengliang Guan 已提交
153
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
S
Shengliang Guan 已提交
154 155
  SRpcMsg *pRpc = &pMsg->rpcMsg;
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  SMsgHead *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
158 159 160 161 162
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) {
163
    dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
S
Shengliang Guan 已提交
164
    return -1;
S
shm  
Shengliang Guan 已提交
165 166
  }

S
Shengliang Guan 已提交
167
  switch (qtype) {
S
Shengliang Guan 已提交
168
    case QUERY_QUEUE:
S
Shengliang Guan 已提交
169 170 171
      dTrace("msg:%p, will be written into vnode-query queue", pMsg);
      code = taosWriteQitem(pVnode->pQueryQ, pMsg);
      break;
S
Shengliang Guan 已提交
172
    case FETCH_QUEUE:
S
Shengliang Guan 已提交
173 174 175
      dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
      code = taosWriteQitem(pVnode->pFetchQ, pMsg);
      break;
S
Shengliang Guan 已提交
176
    case WRITE_QUEUE:
S
Shengliang Guan 已提交
177 178
      dTrace("msg:%p, will be written into vnode-write queue", pMsg);
      code = taosWriteQitem(pVnode->pWriteQ, pMsg);
S
Shengliang Guan 已提交
179 180
      break;
    case SYNC_QUEUE:
S
Shengliang Guan 已提交
181 182
      dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
      code = taosWriteQitem(pVnode->pSyncQ, pMsg);
S
Shengliang Guan 已提交
183
      break;
S
Shengliang Guan 已提交
184 185 186 187
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      break;
  }
S
shm  
Shengliang Guan 已提交
188 189 190

  vmReleaseVnode(pMgmt, pVnode);
  return code;
S
shm  
Shengliang Guan 已提交
191 192
}

S
shm  
Shengliang Guan 已提交
193
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
194
  return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
S
Shengliang Guan 已提交
195
}
S
shm  
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
198
  return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
S
shm  
Shengliang Guan 已提交
199 200
}

S
shm  
Shengliang Guan 已提交
201
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
202
  return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
S
shm  
Shengliang Guan 已提交
203 204
}

S
shm  
Shengliang Guan 已提交
205
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
206
  return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
S
Shengliang Guan 已提交
207
}
S
shm  
Shengliang Guan 已提交
208

S
Shengliang Guan 已提交
209 210 211 212
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
  SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
  dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
  return dndWriteMsgToWorker(pWorker, pMsg);
S
shm  
Shengliang Guan 已提交
213 214
}

S
Shengliang Guan 已提交
215
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
S
shm  
Shengliang Guan 已提交
216
  SVnodesMgmt *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
217 218
  int32_t      code = -1;
  SMsgHead    *pHead = pRpc->pCont;
S
shm  
Shengliang Guan 已提交
219 220 221 222

  SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
  if (pVnode == NULL) return -1;

S
shm  
Shengliang Guan 已提交
223 224
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg != NULL) {
S
Shengliang Guan 已提交
225
    dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
226
    pMsg->rpcMsg = *pRpc;
S
Shengliang Guan 已提交
227
    switch (qtype) {
S
Shengliang Guan 已提交
228
      case QUERY_QUEUE:
S
Shengliang Guan 已提交
229 230 231
        dTrace("msg:%p, will be put into vnode-query queue", pMsg);
        code = taosWriteQitem(pVnode->pQueryQ, pMsg);
        break;
S
Shengliang Guan 已提交
232
      case FETCH_QUEUE:
S
Shengliang Guan 已提交
233 234 235
        dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
        code = taosWriteQitem(pVnode->pFetchQ, pMsg);
        break;
S
Shengliang Guan 已提交
236
      case APPLY_QUEUE:
S
Shengliang Guan 已提交
237 238 239 240 241 242 243
        dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
        code = taosWriteQitem(pVnode->pApplyQ, pMsg);
        break;
      default:
        terrno = TSDB_CODE_INVALID_PARA;
        break;
    }
S
shm  
Shengliang Guan 已提交
244
  }
S
shm  
Shengliang Guan 已提交
245 246 247 248
  vmReleaseVnode(pMgmt, pVnode);
  return code;
}

S
Shengliang Guan 已提交
249
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
250
  return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
S
Shengliang Guan 已提交
251
}
S
shm  
Shengliang Guan 已提交
252

S
Shengliang Guan 已提交
253
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
254
  return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
S
Shengliang Guan 已提交
255
}
S
shm  
Shengliang Guan 已提交
256

S
Shengliang Guan 已提交
257
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
258
  return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
S
shm  
Shengliang Guan 已提交
259 260
}

S
shm  
Shengliang Guan 已提交
261 262 263 264
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
  pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
  pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
  pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
S
Shengliang Guan 已提交
265
  pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
S
shm  
Shengliang Guan 已提交
266
  pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
S
shm  
Shengliang Guan 已提交
267 268 269 270 271 272 273

  if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
      pVnode->pQueryQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
274
  dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
275 276 277
  return 0;
}

S
shm  
Shengliang Guan 已提交
278
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
S
shm  
Shengliang Guan 已提交
279
  tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
S
Shengliang Guan 已提交
280
  tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
S
shm  
Shengliang Guan 已提交
281 282 283 284 285 286 287 288
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
  tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
  tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
  pVnode->pWriteQ = NULL;
  pVnode->pApplyQ = NULL;
  pVnode->pSyncQ = NULL;
  pVnode->pFetchQ = NULL;
  pVnode->pQueryQ = NULL;
S
Shengliang Guan 已提交
289
  dDebug("vgId:%d, vnode queue is freed", pVnode->vgId);
S
shm  
Shengliang Guan 已提交
290 291
}

S
shm  
Shengliang Guan 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
  int32_t maxFetchThreads = 4;
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;
  int32_t maxWriteThreads = TMAX(tsNumOfCores, 1);
  int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1);

  SQWorkerPool *pQPool = &pMgmt->queryPool;
  pQPool->name = "vnode-query";
  pQPool->min = minQueryThreads;
  pQPool->max = maxQueryThreads;
  if (tQWorkerInit(pQPool) != 0) return -1;

S
Shengliang Guan 已提交
306
  SQWorkerPool *pFPool = &pMgmt->fetchPool;
S
shm  
Shengliang Guan 已提交
307 308 309
  pFPool->name = "vnode-fetch";
  pFPool->min = minFetchThreads;
  pFPool->max = maxFetchThreads;
S
Shengliang Guan 已提交
310
  if (tQWorkerInit(pFPool) != 0) return -1;
S
shm  
Shengliang Guan 已提交
311 312 313 314 315 316 317 318 319 320 321

  SWWorkerPool *pWPool = &pMgmt->writePool;
  pWPool->name = "vnode-write";
  pWPool->max = maxWriteThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

  pWPool = &pMgmt->syncPool;
  pWPool->name = "vnode-sync";
  pWPool->max = maxSyncThreads;
  if (tWWorkerInit(pWPool) != 0) return -1;

S
shm  
Shengliang Guan 已提交
322
  if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
S
Shengliang Guan 已提交
323
    dError("failed to start vnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
324 325 326
    return -1;
  }

S
shm  
Shengliang Guan 已提交
327 328 329 330 331
  dDebug("vnode workers is initialized");
  return 0;
}

void vmStopWorker(SVnodesMgmt *pMgmt) {
S
shm  
Shengliang Guan 已提交
332
  dndCleanupWorker(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
333
  tQWorkerCleanup(&pMgmt->fetchPool);
S
shm  
Shengliang Guan 已提交
334 335 336 337 338
  tQWorkerCleanup(&pMgmt->queryPool);
  tWWorkerCleanup(&pMgmt->writePool);
  tWWorkerCleanup(&pMgmt->syncPool);
  dDebug("vnode workers is closed");
}