dmWorker.c 5.1 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmImp.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19 20 21
static void *dmStatusThreadFp(void *param) {
  SDnode *pDnode = param;
  int64_t lastTime = taosGetTimestampMs();
S
shm  
Shengliang Guan 已提交
22

S
Shengliang Guan 已提交
23
  setThreadName("dnode-status");
S
shm  
Shengliang Guan 已提交
24

S
Shengliang Guan 已提交
25
  while (1) {
S
Shengliang Guan 已提交
26
    taosThreadTestCancel();
S
shm  
Shengliang Guan 已提交
27
    taosMsleep(200);
S
Shengliang Guan 已提交
28 29

    if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) {
S
shm  
Shengliang Guan 已提交
30 31 32 33
      continue;
    }

    int64_t curTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
34 35
    float   interval = (curTime - lastTime) / 1000.0f;
    if (interval >= tsStatusInterval) {
S
Shengliang Guan 已提交
36
      dmSendStatusReq(pDnode);
S
Shengliang Guan 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
      lastTime = curTime;
    }
  }

  return NULL;
}

static void *dmMonitorThreadFp(void *param) {
  SDnode *pDnode = param;
  int64_t lastTime = taosGetTimestampMs();

  setThreadName("dnode-monitor");

  while (1) {
    taosThreadTestCancel();
    taosMsleep(200);

    if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) {
      continue;
S
shm  
Shengliang Guan 已提交
56 57
    }

S
Shengliang Guan 已提交
58 59 60
    int64_t curTime = taosGetTimestampMs();
    float   interval = (curTime - lastTime) / 1000.0f;
    if (interval >= tsMonitorInterval) {
S
Shengliang Guan 已提交
61
      dmSendMonitorReport(pDnode);
S
Shengliang Guan 已提交
62
      lastTime = curTime;
S
shm  
Shengliang Guan 已提交
63
    }
S
shm  
Shengliang Guan 已提交
64
  }
S
Shengliang Guan 已提交
65 66 67 68 69

  return NULL;
}

int32_t dmStartStatusThread(SDnode *pDnode) {
S
Shengliang Guan 已提交
70 71
  pDnode->data.statusThreadId = taosCreateThread(dmStatusThreadFp, pDnode);
  if (pDnode->data.statusThreadId == NULL) {
S
Shengliang Guan 已提交
72 73 74 75 76
    dError("failed to init dnode status thread");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
77
  dmReportStartup(pDnode, "dnode-status", "initialized");
S
Shengliang Guan 已提交
78 79 80 81
  return 0;
}

void dmStopStatusThread(SDnode *pDnode) {
S
Shengliang Guan 已提交
82 83 84
  if (pDnode->data.statusThreadId != NULL) {
    taosDestoryThread(pDnode->data.statusThreadId);
    pDnode->data.statusThreadId = NULL;
S
Shengliang Guan 已提交
85
  }
S
shm  
Shengliang Guan 已提交
86 87
}

S
Shengliang Guan 已提交
88
int32_t dmStartMonitorThread(SDnode *pDnode) {
S
Shengliang Guan 已提交
89 90
  pDnode->data.monitorThreadId = taosCreateThread(dmMonitorThreadFp, pDnode);
  if (pDnode->data.monitorThreadId == NULL) {
S
Shengliang Guan 已提交
91
    dError("failed to init dnode monitor thread");
S
Shengliang Guan 已提交
92 93 94 95
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
96
  dmReportStartup(pDnode, "dnode-monitor", "initialized");
S
Shengliang Guan 已提交
97 98 99
  return 0;
}

S
Shengliang Guan 已提交
100
void dmStopMonitorThread(SDnode *pDnode) {
S
Shengliang Guan 已提交
101 102 103
  if (pDnode->data.monitorThreadId != NULL) {
    taosDestoryThread(pDnode->data.monitorThreadId);
    pDnode->data.monitorThreadId = NULL;
S
Shengliang Guan 已提交
104 105
  }
}
S
Shengliang Guan 已提交
106

S
Shengliang Guan 已提交
107 108
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SDnode  *pDnode = pInfo->ahandle;
S
shm  
Shengliang Guan 已提交
109
  SRpcMsg *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
110
  int32_t  code = -1;
S
Shengliang Guan 已提交
111
  dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
S
shm  
Shengliang Guan 已提交
112

S
shm  
Shengliang Guan 已提交
113
  switch (pRpc->msgType) {
S
shm  
Shengliang Guan 已提交
114
    case TDMT_DND_CONFIG_DNODE:
S
Shengliang Guan 已提交
115
      code = dmProcessConfigReq(pDnode, pMsg);
S
shm  
Shengliang Guan 已提交
116 117
      break;
    case TDMT_MND_AUTH_RSP:
S
Shengliang Guan 已提交
118
      code = dmProcessAuthRsp(pDnode, pMsg);
S
shm  
Shengliang Guan 已提交
119 120
      break;
    case TDMT_MND_GRANT_RSP:
S
Shengliang Guan 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
      code = dmProcessGrantRsp(pDnode, pMsg);
      break;
    case TDMT_DND_CREATE_MNODE:
      code = dmProcessCreateNodeReq(pDnode, MNODE, pMsg);
      break;
    case TDMT_DND_DROP_MNODE:
      code = dmProcessDropNodeReq(pDnode, MNODE, pMsg);
      break;
    case TDMT_DND_CREATE_QNODE:
      code = dmProcessCreateNodeReq(pDnode, QNODE, pMsg);
      break;
    case TDMT_DND_DROP_QNODE:
      code = dmProcessDropNodeReq(pDnode, QNODE, pMsg);
      break;
    case TDMT_DND_CREATE_SNODE:
      code = dmProcessCreateNodeReq(pDnode, SNODE, pMsg);
      break;
    case TDMT_DND_DROP_SNODE:
      code = dmProcessDropNodeReq(pDnode, SNODE, pMsg);
      break;
    case TDMT_DND_CREATE_BNODE:
      code = dmProcessCreateNodeReq(pDnode, BNODE, pMsg);
      break;
    case TDMT_DND_DROP_BNODE:
      code = dmProcessDropNodeReq(pDnode, BNODE, pMsg);
S
shm  
Shengliang Guan 已提交
146
      break;
S
shm  
Shengliang Guan 已提交
147
    default:
S
Shengliang Guan 已提交
148
      break;
S
shm  
Shengliang Guan 已提交
149
  }
S
shm  
Shengliang Guan 已提交
150

S
shm  
Shengliang Guan 已提交
151
  if (pRpc->msgType & 1u) {
S
shm  
Shengliang Guan 已提交
152
    if (code != 0) code = terrno;
S
shm  
Shengliang Guan 已提交
153
    SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
shm  
Shengliang Guan 已提交
154 155 156
    rpcSendResponse(&rsp);
  }

S
shm  
Shengliang Guan 已提交
157
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
158 159
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
160 161
}

S
Shengliang Guan 已提交
162 163 164
int32_t dmStartWorker(SDnode *pDnode) {
  SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pDnode};
  if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
165
    dError("failed to start dnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
166 167 168
    return -1;
  }

S
Shengliang Guan 已提交
169
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
170 171 172
  return 0;
}

S
Shengliang Guan 已提交
173 174
void dmStopWorker(SDnode *pDnode) {
  tSingleWorkerCleanup(&pDnode->data.mgmtWorker);
S
Shengliang Guan 已提交
175
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
176
}
S
shm  
Shengliang Guan 已提交
177

S
Shengliang Guan 已提交
178
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
179
  SSingleWorker *pWorker = &pWrapper->pDnode->data.mgmtWorker;
S
Shengliang Guan 已提交
180
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
181 182
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
183
}