dmWorker.c 8.4 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmInt.h"
D
dapan1121 已提交
18
#include "thttp.h"
S
shm  
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
static void *dmStatusThreadFp(void *param) {
S
Shengliang Guan 已提交
21 22
  SDnodeMgmt *pMgmt = param;
  int64_t     lastTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
23
  setThreadName("dnode-status");
dengyihao's avatar
dengyihao 已提交
24 25 26

  const static int16_t TRIM_FREQ = 30;
  int32_t              trimCount = 0;
K
kailixu 已提交
27 28 29
  int32_t              upTimeCount = 0;
  int64_t              upTime = 0;

S
Shengliang Guan 已提交
30
  while (1) {
S
shm  
Shengliang Guan 已提交
31
    taosMsleep(200);
S
Shengliang Guan 已提交
32
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
S
shm  
Shengliang Guan 已提交
33 34

    int64_t curTime = taosGetTimestampMs();
K
kailixu 已提交
35 36
    if (curTime < lastTime) lastTime = curTime;
    float interval = (curTime - lastTime) / 1000.0f;
S
Shengliang Guan 已提交
37
    if (interval >= tsStatusInterval) {
S
Shengliang Guan 已提交
38
      dmSendStatusReq(pMgmt);
S
Shengliang Guan 已提交
39
      lastTime = curTime;
dengyihao's avatar
dengyihao 已提交
40 41 42 43 44

      trimCount = (trimCount + 1) % TRIM_FREQ;
      if (trimCount == 0) {
        taosMemoryTrim(0);
      }
K
kailixu 已提交
45

46
      if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
K
kailixu 已提交
47
        upTime = taosGetOsUptime() - tsDndStartOsUptime;
K
kailixu 已提交
48
        tsDndUpTime = TMAX(tsDndUpTime, upTime);
K
kailixu 已提交
49
      }
S
Shengliang Guan 已提交
50 51 52 53 54 55 56
    }
  }

  return NULL;
}

static void *dmMonitorThreadFp(void *param) {
S
Shengliang Guan 已提交
57 58
  SDnodeMgmt *pMgmt = param;
  int64_t     lastTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
59 60 61 62
  setThreadName("dnode-monitor");

  while (1) {
    taosMsleep(200);
S
Shengliang Guan 已提交
63
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
S
shm  
Shengliang Guan 已提交
64

S
Shengliang Guan 已提交
65
    int64_t curTime = taosGetTimestampMs();
K
kailixu 已提交
66 67
    if (curTime < lastTime) lastTime = curTime;
    float interval = (curTime - lastTime) / 1000.0f;
S
Shengliang Guan 已提交
68
    if (interval >= tsMonitorInterval) {
S
Shengliang Guan 已提交
69
      (*pMgmt->sendMonitorReportFp)();
S
Shengliang Guan 已提交
70
      lastTime = curTime;
S
shm  
Shengliang Guan 已提交
71
    }
S
shm  
Shengliang Guan 已提交
72
  }
S
Shengliang Guan 已提交
73 74 75 76

  return NULL;
}

D
dapan1121 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
static void *dmCrashReportThreadFp(void *param) {
  SDnodeMgmt *pMgmt = param;
  int64_t     lastTime = taosGetTimestampMs();
  setThreadName("dnode-crashReport");
  char filepath[PATH_MAX] = {0};
  snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
  char *pMsg = NULL;
  int64_t msgLen = 0;
  TdFilePtr pFile = NULL;
  bool truncateFile = false;
  int32_t sleepTime = 200;
  int32_t reportPeriodNum = 3600 * 1000 / sleepTime;;
  int32_t loopTimes = reportPeriodNum;
  
  while (1) {
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
    if (loopTimes++ < reportPeriodNum) {
      taosMsleep(sleepTime);
      continue;
    }

    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
    if (pMsg && msgLen > 0) {
      if (taosSendHttpReport(tsTelemServer, tsSvrCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
        dError("failed to send crash report");
        if (pFile) {
          taosReleaseCrashLogFile(pFile, false);
D
dapan1121 已提交
104
          pFile = NULL;
D
dapan1121 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
          continue;
        }
      } else {
        dInfo("succeed to send crash report");
        truncateFile = true;
      }
    } else {
      dDebug("no crash info");
    }

    taosMemoryFree(pMsg);

    if (pMsg && msgLen > 0) {
      pMsg = NULL;
      continue;
    }
    
    if (pFile) {
      taosReleaseCrashLogFile(pFile, truncateFile);
D
dapan1121 已提交
124
      pFile = NULL;
D
dapan1121 已提交
125 126 127 128 129 130 131 132 133 134 135
      truncateFile = false;
    }
    
    taosMsleep(sleepTime);
    loopTimes = 0;
  }

  return NULL;
}


S
Shengliang Guan 已提交
136
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
137 138 139 140 141
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
    dError("failed to create status thread since %s", strerror(errno));
S
Shengliang Guan 已提交
142 143 144
    return -1;
  }

S
Shengliang Guan 已提交
145
  taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
146
  tmsgReportStartup("dnode-status", "initialized");
S
Shengliang Guan 已提交
147 148 149
  return 0;
}

S
Shengliang Guan 已提交
150
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
151 152
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
    taosThreadJoin(pMgmt->statusThread, NULL);
153
    taosThreadClear(&pMgmt->statusThread);
S
Shengliang Guan 已提交
154
  }
S
shm  
Shengliang Guan 已提交
155 156
}

S
Shengliang Guan 已提交
157
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
158 159 160 161 162
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
    dError("failed to create monitor thread since %s", strerror(errno));
S
Shengliang Guan 已提交
163 164 165
    return -1;
  }

S
Shengliang Guan 已提交
166
  taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
167
  tmsgReportStartup("dnode-monitor", "initialized");
S
Shengliang Guan 已提交
168 169 170
  return 0;
}

S
Shengliang Guan 已提交
171
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
172 173
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
    taosThreadJoin(pMgmt->monitorThread, NULL);
174
    taosThreadClear(&pMgmt->monitorThread);
S
Shengliang Guan 已提交
175 176
  }
}
S
Shengliang Guan 已提交
177

D
dapan1121 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) {
  if (!tsEnableCrashReport) {
    return 0;
  }

  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->crashReportThread, &thAttr, dmCrashReportThreadFp, pMgmt) != 0) {
    dError("failed to create crashReport thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  tmsgReportStartup("dnode-crashReport", "initialized");
  return 0;
}

void dmStopCrashReportThread(SDnodeMgmt *pMgmt) {
  if (!tsEnableCrashReport) {
    return;
  }

  if (taosCheckPthreadValid(pMgmt->crashReportThread)) {
    taosThreadJoin(pMgmt->crashReportThread, NULL);
    taosThreadClear(&pMgmt->crashReportThread);
  }
}


S
Shengliang Guan 已提交
208
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
209
  SDnodeMgmt *pMgmt = pInfo->ahandle;
210
  int32_t     code = -1;
H
Hongze Cheng 已提交
211
  STraceId   *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
212
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
213

S
Shengliang Guan 已提交
214
  switch (pMsg->msgType) {
S
shm  
Shengliang Guan 已提交
215
    case TDMT_DND_CONFIG_DNODE:
S
Shengliang Guan 已提交
216
      code = dmProcessConfigReq(pMgmt, pMsg);
S
shm  
Shengliang Guan 已提交
217 218
      break;
    case TDMT_MND_AUTH_RSP:
S
Shengliang Guan 已提交
219
      code = dmProcessAuthRsp(pMgmt, pMsg);
S
shm  
Shengliang Guan 已提交
220 221
      break;
    case TDMT_MND_GRANT_RSP:
S
Shengliang Guan 已提交
222
      code = dmProcessGrantRsp(pMgmt, pMsg);
S
Shengliang Guan 已提交
223 224
      break;
    case TDMT_DND_CREATE_MNODE:
225
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
S
Shengliang Guan 已提交
226 227
      break;
    case TDMT_DND_DROP_MNODE:
228
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
S
Shengliang Guan 已提交
229 230
      break;
    case TDMT_DND_CREATE_QNODE:
231
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
S
Shengliang Guan 已提交
232 233
      break;
    case TDMT_DND_DROP_QNODE:
234
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
S
Shengliang Guan 已提交
235 236
      break;
    case TDMT_DND_CREATE_SNODE:
237
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
S
Shengliang Guan 已提交
238 239
      break;
    case TDMT_DND_DROP_SNODE:
240
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
S
Shengliang Guan 已提交
241
      break;
C
cadem 已提交
242 243 244
    case TDMT_DND_ALTER_MNODE_TYPE:
      code = (*pMgmt->processAlterNodeTypeFp)(MNODE, pMsg);
      break;
245 246 247
    case TDMT_DND_SERVER_STATUS:
      code = dmProcessServerRunStatus(pMgmt, pMsg);
      break;
D
dapan1121 已提交
248 249 250
    case TDMT_DND_SYSTABLE_RETRIEVE:
      code = dmProcessRetrieve(pMgmt, pMsg);
      break;
C
Cary Xu 已提交
251
    case TDMT_MND_GRANT:
K
kailixu 已提交
252
      code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
C
Cary Xu 已提交
253
      break;
S
shm  
Shengliang Guan 已提交
254
    default:
255
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
256
      dGError("msg:%p, not processed in mgmt queue", pMsg);
S
Shengliang Guan 已提交
257
      break;
S
shm  
Shengliang Guan 已提交
258
  }
S
shm  
Shengliang Guan 已提交
259

S
Shengliang Guan 已提交
260
  if (IsReq(pMsg)) {
S
Shengliang Guan 已提交
261 262 263
    if (code != 0 && terrno != 0) code = terrno;
    SRpcMsg rsp = {
        .code = code,
S
Shengliang Guan 已提交
264 265
        .pCont = pMsg->info.rsp,
        .contLen = pMsg->info.rspLen,
266
        .info = pMsg->info,
S
Shengliang Guan 已提交
267
    };
S
shm  
Shengliang Guan 已提交
268 269 270
    rpcSendResponse(&rsp);
  }

S
Shengliang Guan 已提交
271
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
272
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
273
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
274 275
}

S
Shengliang Guan 已提交
276
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
277 278 279 280 281
  SSingleWorkerCfg cfg = {
      .min = 1,
      .max = 1,
      .name = "dnode-mgmt",
      .fp = (FItem)dmProcessMgmtQueue,
S
Shengliang Guan 已提交
282
      .param = pMgmt,
S
Shengliang Guan 已提交
283
  };
S
Shengliang Guan 已提交
284
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
285
    dError("failed to start dnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
286 287 288
    return -1;
  }

S
Shengliang Guan 已提交
289
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
290 291 292
  return 0;
}

S
Shengliang Guan 已提交
293 294
void dmStopWorker(SDnodeMgmt *pMgmt) {
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
295
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
296
}
S
shm  
Shengliang Guan 已提交
297

S
Shengliang Guan 已提交
298
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
299
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
300
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
301 302
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
303
}