mmWorker.c 6.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20 21
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
22 23
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
24
      .info = pMsg->info,
S
Shengliang Guan 已提交
25
  };
26 27 28
  tmsgSendRsp(&rsp);
}

S
Shengliang Guan 已提交
29
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
30
  SMnodeMgmt *pMgmt = pInfo->ahandle;
S
Shengliang 已提交
31
  int32_t     code = -1;
S
Shengliang Guan 已提交
32
  dTrace("msg:%p, get from mnode queue", pMsg);
S
shm  
Shengliang Guan 已提交
33

S
Shengliang Guan 已提交
34
  switch (pMsg->msgType) {
S
Shengliang Guan 已提交
35
    case TDMT_MON_MM_INFO:
S
Shengliang 已提交
36
      code = mmProcessGetMonitorInfoReq(pMgmt, pMsg);
S
Shengliang Guan 已提交
37 38
      break;
    case TDMT_MON_MM_LOAD:
S
Shengliang 已提交
39
      code = mmProcessGetLoadsReq(pMgmt, pMsg);
S
Shengliang Guan 已提交
40 41
      break;
    default:
S
Shengliang Guan 已提交
42
      pMsg->info.node = pMgmt->pMnode;
S
Shengliang Guan 已提交
43
      code = mndProcessRpcMsg(pMsg);
S
shm  
Shengliang Guan 已提交
44 45
  }

S
Shengliang Guan 已提交
46
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
47 48
    if (code != 0 && terrno != 0) code = terrno;
    mmSendRsp(pMsg, code);
S
shm  
Shengliang Guan 已提交
49 50
  }

S
Shengliang Guan 已提交
51
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
52
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
53 54 55
  taosFreeQitem(pMsg);
}

M
Minghao Li 已提交
56 57
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
  SMnodeMgmt *pMgmt = pInfo->ahandle;
58 59
  dTrace("msg:%p, get from mnode-sync queue", pMsg);

M
Minghao Li 已提交
60
  pMsg->info.node = pMgmt->pMnode;
M
Minghao Li 已提交
61 62 63 64 65

  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);

66 67 68 69 70
  int32_t code = mndProcessSyncMsg(pMsg);

  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
M
Minghao Li 已提交
71
}
M
Minghao Li 已提交
72

S
Shengliang Guan 已提交
73
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
74
  dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
75 76
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
77 78
}

S
Shengliang Guan 已提交
79 80 81
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
}
S
shm  
Shengliang Guan 已提交
82

S
Shengliang Guan 已提交
83 84 85
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
}
S
shm  
Shengliang Guan 已提交
86

S
Shengliang Guan 已提交
87 88 89
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
}
S
Shengliang 已提交
90

S
Shengliang Guan 已提交
91 92
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
D
dapan1121 已提交
93 94
}

S
Shengliang Guan 已提交
95
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang 已提交
96
  return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
97 98
}

S
Shengliang Guan 已提交
99
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
100
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
101
  if (pMsg == NULL) return -1;
S
Shengliang Guan 已提交
102
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
103

S
Shengliang Guan 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  switch (qtype) {
    case WRITE_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-write queue", pMsg);
      taosWriteQitem(pMgmt->writeWorker.queue, pMsg);
      return 0;
    case QUERY_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-query queue", pMsg);
      taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
      return 0;

    case READ_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-read queue", pMsg);
      taosWriteQitem(pMgmt->readWorker.queue, pMsg);
      return 0;
    case SYNC_QUEUE:
      if (mmAcquire(pMgmt) == 0) {
        dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg);
        taosWriteQitem(pMgmt->syncWorker.queue, pMsg);
        mmRelease(pMgmt);
        return 0;
      } else {
        return -1;
      }
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return -1;
S
Shengliang Guan 已提交
130
  }
dengyihao's avatar
dengyihao 已提交
131
}
D
dapan1121 已提交
132

S
Shengliang Guan 已提交
133
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
134 135 136 137
  SSingleWorkerCfg qCfg = {
      .min = tsNumOfMnodeQueryThreads,
      .max = tsNumOfMnodeQueryThreads,
      .name = "mnode-query",
S
Shengliang Guan 已提交
138
      .fp = (FItem)mmProcessQueue,
S
Shengliang Guan 已提交
139 140
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
141
  if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
D
dapan1121 已提交
142 143 144 145
    dError("failed to start mnode-query worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
146 147 148 149 150 151 152
  SSingleWorkerCfg rCfg = {
      .min = tsNumOfMnodeReadThreads,
      .max = tsNumOfMnodeReadThreads,
      .name = "mnode-read",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
153
  if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
S
Shengliang Guan 已提交
154
    dError("failed to start mnode-read worker since %s", terrstr());
S
Shengliang Guan 已提交
155 156 157
    return -1;
  }

S
Shengliang Guan 已提交
158 159 160 161 162 163 164
  SSingleWorkerCfg wCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-write",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
165
  if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
S
Shengliang Guan 已提交
166
    dError("failed to start mnode-write worker since %s", terrstr());
S
Shengliang Guan 已提交
167 168 169
    return -1;
  }

S
Shengliang Guan 已提交
170 171 172 173
  SSingleWorkerCfg sCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-sync",
M
Minghao Li 已提交
174
      .fp = (FItem)mmProcessSyncQueue,
S
Shengliang Guan 已提交
175 176
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
177
  if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
178
    dError("failed to start mnode mnode-sync worker since %s", terrstr());
S
Shengliang Guan 已提交
179 180 181
    return -1;
  }

S
Shengliang Guan 已提交
182 183 184 185 186 187 188 189 190 191
  SSingleWorkerCfg mCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-monitor",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
    dError("failed to start mnode mnode-monitor worker since %s", terrstr());
    return -1;
192 193
  }

S
Shengliang Guan 已提交
194
  dDebug("mnode workers are initialized");
S
Shengliang Guan 已提交
195 196 197 198
  return 0;
}

void mmStopWorker(SMnodeMgmt *pMgmt) {
199 200 201 202 203
  taosThreadRwlockWrlock(&pMgmt->lock);
  pMgmt->stopped = 1;
  taosThreadRwlockUnlock(&pMgmt->lock);
  while (pMgmt->refCount > 0) taosMsleep(10);

204
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
D
dapan1121 已提交
205
  tSingleWorkerCleanup(&pMgmt->queryWorker);
S
Shengliang Guan 已提交
206
  tSingleWorkerCleanup(&pMgmt->readWorker);
S
Shengliang Guan 已提交
207 208
  tSingleWorkerCleanup(&pMgmt->writeWorker);
  tSingleWorkerCleanup(&pMgmt->syncWorker);
S
Shengliang Guan 已提交
209
  dDebug("mnode workers are closed");
S
Shengliang Guan 已提交
210
}