mmWorker.c 6.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "mmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
S
Shengliang Guan 已提交
20 21
  SRpcMsg rsp = {
      .code = code,
S
Shengliang Guan 已提交
22 23
      .pCont = pMsg->info.rsp,
      .contLen = pMsg->info.rspLen,
24
      .info = pMsg->info,
S
Shengliang Guan 已提交
25
  };
26 27 28
  tmsgSendRsp(&rsp);
}

S
Shengliang Guan 已提交
29
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
30
  SMnodeMgmt *pMgmt = pInfo->ahandle;
S
Shengliang 已提交
31
  int32_t     code = -1;
S
Shengliang Guan 已提交
32
  dTrace("msg:%p, get from mnode queue", pMsg);
S
shm  
Shengliang Guan 已提交
33

S
Shengliang Guan 已提交
34
  switch (pMsg->msgType) {
S
Shengliang Guan 已提交
35
    case TDMT_MON_MM_INFO:
S
Shengliang 已提交
36
      code = mmProcessGetMonitorInfoReq(pMgmt, pMsg);
S
Shengliang Guan 已提交
37 38
      break;
    case TDMT_MON_MM_LOAD:
S
Shengliang 已提交
39
      code = mmProcessGetLoadsReq(pMgmt, pMsg);
S
Shengliang Guan 已提交
40 41
      break;
    default:
S
Shengliang Guan 已提交
42
      pMsg->info.node = pMgmt->pMnode;
S
Shengliang Guan 已提交
43
      code = mndProcessRpcMsg(pMsg);
S
shm  
Shengliang Guan 已提交
44 45
  }

S
Shengliang Guan 已提交
46
  if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
47 48
    if (code != 0 && terrno != 0) code = terrno;
    mmSendRsp(pMsg, code);
S
shm  
Shengliang Guan 已提交
49 50
  }

S
Shengliang Guan 已提交
51
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
52
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
53 54 55
  taosFreeQitem(pMsg);
}

M
Minghao Li 已提交
56 57
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
  SMnodeMgmt *pMgmt = pInfo->ahandle;
58 59
  dTrace("msg:%p, get from mnode-sync queue", pMsg);

M
Minghao Li 已提交
60
  pMsg->info.node = pMgmt->pMnode;
M
Minghao Li 已提交
61 62 63 64 65

  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = ntohl(pHead->contLen);
  pHead->vgId = ntohl(pHead->vgId);

66 67 68 69 70
  int32_t code = mndProcessSyncMsg(pMsg);

  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
  rpcFreeCont(pMsg->pCont);
  taosFreeQitem(pMsg);
M
Minghao Li 已提交
71
}
M
Minghao Li 已提交
72

S
Shengliang Guan 已提交
73
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
74
  dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
75 76
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
77 78
}

S
Shengliang Guan 已提交
79 80 81
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
}
S
shm  
Shengliang Guan 已提交
82

S
Shengliang Guan 已提交
83 84 85
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
}
S
shm  
Shengliang Guan 已提交
86

S
Shengliang Guan 已提交
87 88 89
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
  return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
}
S
Shengliang 已提交
90

S
Shengliang Guan 已提交
91
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
D
dapan1121 已提交
92 93
  mndPreprocessQueryMsg(pMgmt->pMnode, pMsg);

S
Shengliang Guan 已提交
94
  return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
D
dapan1121 已提交
95 96
}

S
Shengliang Guan 已提交
97
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang 已提交
98
  return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
99 100
}

S
Shengliang Guan 已提交
101
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
S
Shengliang Guan 已提交
102
  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
S
Shengliang Guan 已提交
103
  if (pMsg == NULL) return -1;
S
Shengliang Guan 已提交
104
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
S
shm  
Shengliang Guan 已提交
105

S
Shengliang Guan 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  switch (qtype) {
    case WRITE_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-write queue", pMsg);
      taosWriteQitem(pMgmt->writeWorker.queue, pMsg);
      return 0;
    case QUERY_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-query queue", pMsg);
      taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
      return 0;

    case READ_QUEUE:
      dTrace("msg:%p, is created and will put into vnode-read queue", pMsg);
      taosWriteQitem(pMgmt->readWorker.queue, pMsg);
      return 0;
    case SYNC_QUEUE:
      if (mmAcquire(pMgmt) == 0) {
        dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg);
        taosWriteQitem(pMgmt->syncWorker.queue, pMsg);
        mmRelease(pMgmt);
        return 0;
      } else {
        return -1;
      }
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return -1;
S
Shengliang Guan 已提交
132
  }
dengyihao's avatar
dengyihao 已提交
133
}
D
dapan1121 已提交
134

S
Shengliang Guan 已提交
135
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
136 137 138 139
  SSingleWorkerCfg qCfg = {
      .min = tsNumOfMnodeQueryThreads,
      .max = tsNumOfMnodeQueryThreads,
      .name = "mnode-query",
S
Shengliang Guan 已提交
140
      .fp = (FItem)mmProcessQueue,
S
Shengliang Guan 已提交
141 142
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
143
  if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
D
dapan1121 已提交
144 145 146 147
    dError("failed to start mnode-query worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
148 149 150 151 152 153 154
  SSingleWorkerCfg rCfg = {
      .min = tsNumOfMnodeReadThreads,
      .max = tsNumOfMnodeReadThreads,
      .name = "mnode-read",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
155
  if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
S
Shengliang Guan 已提交
156
    dError("failed to start mnode-read worker since %s", terrstr());
S
Shengliang Guan 已提交
157 158 159
    return -1;
  }

S
Shengliang Guan 已提交
160 161 162 163 164 165 166
  SSingleWorkerCfg wCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-write",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
167
  if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
S
Shengliang Guan 已提交
168
    dError("failed to start mnode-write worker since %s", terrstr());
S
Shengliang Guan 已提交
169 170 171
    return -1;
  }

S
Shengliang Guan 已提交
172 173 174 175
  SSingleWorkerCfg sCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-sync",
M
Minghao Li 已提交
176
      .fp = (FItem)mmProcessSyncQueue,
S
Shengliang Guan 已提交
177 178
      .param = pMgmt,
  };
S
shm  
Shengliang Guan 已提交
179
  if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
180
    dError("failed to start mnode mnode-sync worker since %s", terrstr());
S
Shengliang Guan 已提交
181 182 183
    return -1;
  }

S
Shengliang Guan 已提交
184 185 186 187 188 189 190 191 192 193
  SSingleWorkerCfg mCfg = {
      .min = 1,
      .max = 1,
      .name = "mnode-monitor",
      .fp = (FItem)mmProcessQueue,
      .param = pMgmt,
  };
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
    dError("failed to start mnode mnode-monitor worker since %s", terrstr());
    return -1;
194 195
  }

S
Shengliang Guan 已提交
196
  dDebug("mnode workers are initialized");
S
Shengliang Guan 已提交
197 198 199 200
  return 0;
}

void mmStopWorker(SMnodeMgmt *pMgmt) {
201 202 203 204 205
  taosThreadRwlockWrlock(&pMgmt->lock);
  pMgmt->stopped = 1;
  taosThreadRwlockUnlock(&pMgmt->lock);
  while (pMgmt->refCount > 0) taosMsleep(10);

206
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
D
dapan1121 已提交
207
  tSingleWorkerCleanup(&pMgmt->queryWorker);
S
Shengliang Guan 已提交
208
  tSingleWorkerCleanup(&pMgmt->readWorker);
S
Shengliang Guan 已提交
209 210
  tSingleWorkerCleanup(&pMgmt->writeWorker);
  tSingleWorkerCleanup(&pMgmt->syncWorker);
S
Shengliang Guan 已提交
211
  dDebug("mnode workers are closed");
S
Shengliang Guan 已提交
212
}