qmWorker.c 3.8 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "qmInt.h"

S
Shengliang Guan 已提交
19 20 21 22 23 24 25 26 27 28
static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
  SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
  dndSendRsp(pWrapper, &rsp);
}

static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
  int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
  if (code != 0) {
    qmSendRsp(pMgmt->pWrapper, pMsg, code);
S
shm  
Shengliang Guan 已提交
29 30
  }

S
shm  
Shengliang Guan 已提交
31
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
Shengliang Guan 已提交
32
  rpcFreeCont(pMsg->rpcMsg.pCont);
S
shm  
Shengliang Guan 已提交
33 34 35
  taosFreeQitem(pMsg);
}

S
Shengliang Guan 已提交
36 37 38 39 40 41 42 43 44 45 46
static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
  dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
  int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
  if (code != 0) {
    qmSendRsp(pMgmt->pWrapper, pMsg, code);
  }

  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
}
S
shm  
Shengliang Guan 已提交
47

S
Shengliang Guan 已提交
48 49
static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
shm  
Shengliang Guan 已提交
50
  return dndWriteMsgToWorker(pWorker, pMsg);
S
shm  
Shengliang Guan 已提交
51 52
}

S
Shengliang Guan 已提交
53
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
S
shm  
Shengliang Guan 已提交
54

S
Shengliang Guan 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }

static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
  SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
  if (pMsg == NULL) {
    return -1;
  }

  dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
  pMsg->rpcMsg = *pRpc;

  int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
  if (code != 0) {
    dTrace("msg:%p, is freed", pMsg);
    taosFreeQitem(pMsg);
    rpcFreeCont(pRpc->pCont);
  }

  return code;
}

int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  SQnodeMgmt *pMgmt = pWrapper->pMgmt;
  return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
}

int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
  SQnodeMgmt *pMgmt = pWrapper->pMgmt;
  return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
S
shm  
Shengliang Guan 已提交
84 85 86
}

int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
87 88 89 90 91 92 93
  int32_t maxFetchThreads = 4;
  int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
  int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
  int32_t maxQueryThreads = minQueryThreads;

  if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
                    qmProcessQueryQueue) != 0) {
S
shm  
Shengliang Guan 已提交
94 95 96 97
    dError("failed to start qnode query worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
98 99
  if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
                    qmProcessFetchQueue) != 0) {
S
shm  
Shengliang Guan 已提交
100 101 102 103 104 105 106 107 108 109 110
    dError("failed to start qnode fetch worker since %s", terrstr());
    return -1;
  }

  return 0;
}

void qmStopWorker(SQnodeMgmt *pMgmt) {
  dndCleanupWorker(&pMgmt->queryWorker);
  dndCleanupWorker(&pMgmt->fetchWorker);
}