dmWorker.c 4.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static void *dmThreadRoutine(void *param) {
S
shm  
Shengliang Guan 已提交
20
  SDnodeMgmt *pMgmt = param;
S
Shengliang Guan 已提交
21
  SDnode     *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
22 23 24 25 26 27
  int64_t     lastStatusTime = taosGetTimestampMs();
  int64_t     lastMonitorTime = lastStatusTime;

  setThreadName("dnode-hb");

  while (true) {
S
Shengliang Guan 已提交
28
    taosThreadTestCancel();
S
shm  
Shengliang Guan 已提交
29
    taosMsleep(200);
S
shm  
Shengliang Guan 已提交
30
    if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) {
S
shm  
Shengliang Guan 已提交
31 32 33 34
      continue;
    }

    int64_t curTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
35
    float   statusInterval = (curTime - lastStatusTime) / 1000.0f;
S
shm  
Shengliang Guan 已提交
36
    if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
S
shm  
Shengliang Guan 已提交
37
      dmSendStatusReq(pMgmt);
S
shm  
Shengliang Guan 已提交
38 39 40
      lastStatusTime = curTime;
    }

S
shm  
Shengliang Guan 已提交
41 42
    float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
    if (monitorInterval >= tsMonitorInterval) {
S
Shengliang Guan 已提交
43
      dmSendMonitorReport(pDnode);
S
shm  
Shengliang Guan 已提交
44 45
      lastMonitorTime = curTime;
    }
S
shm  
Shengliang Guan 已提交
46 47 48
  }
}

S
Shengliang Guan 已提交
49 50 51 52 53 54 55 56 57 58 59
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
  pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
  if (pMgmt->threadId == NULL) {
    dError("failed to init dnode thread");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
60 61 62
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SDnodeMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
63
  SDnode  *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
64
  SRpcMsg *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
65
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
66
  dTrace("msg:%p, will be processed in dnode queue", pMsg);
S
shm  
Shengliang Guan 已提交
67

S
shm  
Shengliang Guan 已提交
68
  switch (pRpc->msgType) {
S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74 75 76 77 78 79 80
    case TDMT_DND_CONFIG_DNODE:
      code = dmProcessConfigReq(pMgmt, pMsg);
      break;
    case TDMT_MND_STATUS_RSP:
      code = dmProcessStatusRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_AUTH_RSP:
      code = dmProcessAuthRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_GRANT_RSP:
      code = dmProcessGrantRsp(pMgmt, pMsg);
      break;
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
    case TDMT_MON_MM_INFO:
      code = dmProcessGetMonMmInfoReq(pMgmt, pMsg);
      break;
    case TDMT_MON_VM_INFO:
      code = dmProcessGetMonVmInfoReq(pMgmt, pMsg);
      break;
    case TDMT_MON_QM_INFO:
      code = dmProcessGetMonQmInfoReq(pMgmt, pMsg);
      break;
    case TDMT_MON_SM_INFO:
      code = dmProcessGetMonSmInfoReq(pMgmt, pMsg);
      break;
    case TDMT_MON_BM_INFO:
      code = dmProcessGetMonBmInfoReq(pMgmt, pMsg);
      break;
    case TDMT_MON_VM_LOAD:
      code = dmProcessGetVnodeLoadsReq(pMgmt, pMsg);
98
      break;
S
shm  
Shengliang Guan 已提交
99
    default:
S
Shengliang Guan 已提交
100
      code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg);
S
Shengliang Guan 已提交
101
      break;
S
shm  
Shengliang Guan 已提交
102
  }
S
shm  
Shengliang Guan 已提交
103

S
shm  
Shengliang Guan 已提交
104
  if (pRpc->msgType & 1u) {
S
shm  
Shengliang Guan 已提交
105
    if (code != 0) code = terrno;
S
shm  
Shengliang Guan 已提交
106
    SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
shm  
Shengliang Guan 已提交
107 108 109
    rpcSendResponse(&rsp);
  }

S
shm  
Shengliang Guan 已提交
110
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
111 112
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
113 114 115
}

int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
116 117
  SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) {
S
shm  
Shengliang Guan 已提交
118 119 120 121
    dError("failed to start dnode mgmt worker since %s", terrstr());
    return -1;
  }

122 123 124
  SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-monitor", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->monitorWorker, &scfg) != 0) {
    dError("failed to start dnode monitor worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
125 126 127
    return -1;
  }

S
Shengliang Guan 已提交
128
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
129 130 131
  return 0;
}

S
shm  
Shengliang Guan 已提交
132
void dmStopWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
133
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
134
  tSingleWorkerCleanup(&pMgmt->monitorWorker);
S
shm  
Shengliang Guan 已提交
135

S
shm  
Shengliang Guan 已提交
136 137 138
  if (pMgmt->threadId != NULL) {
    taosDestoryThread(pMgmt->threadId);
    pMgmt->threadId = NULL;
S
shm  
Shengliang Guan 已提交
139
  }
S
Shengliang Guan 已提交
140
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
141
}
S
shm  
Shengliang Guan 已提交
142

S
Shengliang Guan 已提交
143
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
144
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
145
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
146 147 148 149 150 151

  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

152
int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
153
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
154
  SSingleWorker *pWorker = &pMgmt->monitorWorker;
S
shm  
Shengliang Guan 已提交
155

S
Shengliang Guan 已提交
156
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
157 158
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
159
}