dmWorker.c 5.7 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
Shengliang Guan 已提交
19
static void *dmStatusThreadFp(void *param) {
S
Shengliang Guan 已提交
20 21
  SDnodeMgmt *pMgmt = param;
  int64_t     lastTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
22
  setThreadName("dnode-status");
dengyihao's avatar
dengyihao 已提交
23 24 25

  const static int16_t TRIM_FREQ = 30;
  int32_t              trimCount = 0;
S
Shengliang Guan 已提交
26
  while (1) {
S
shm  
Shengliang Guan 已提交
27
    taosMsleep(200);
S
Shengliang Guan 已提交
28
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
S
shm  
Shengliang Guan 已提交
29 30

    int64_t curTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
31 32
    float   interval = (curTime - lastTime) / 1000.0f;
    if (interval >= tsStatusInterval) {
S
Shengliang Guan 已提交
33
      dmSendStatusReq(pMgmt);
S
Shengliang Guan 已提交
34
      lastTime = curTime;
dengyihao's avatar
dengyihao 已提交
35 36 37 38 39

      trimCount = (trimCount + 1) % TRIM_FREQ;
      if (trimCount == 0) {
        taosMemoryTrim(0);
      }
S
Shengliang Guan 已提交
40 41 42 43 44 45 46
    }
  }

  return NULL;
}

static void *dmMonitorThreadFp(void *param) {
S
Shengliang Guan 已提交
47 48
  SDnodeMgmt *pMgmt = param;
  int64_t     lastTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
49 50 51 52
  setThreadName("dnode-monitor");

  while (1) {
    taosMsleep(200);
S
Shengliang Guan 已提交
53
    if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
S
shm  
Shengliang Guan 已提交
54

S
Shengliang Guan 已提交
55 56 57
    int64_t curTime = taosGetTimestampMs();
    float   interval = (curTime - lastTime) / 1000.0f;
    if (interval >= tsMonitorInterval) {
S
Shengliang Guan 已提交
58
      (*pMgmt->sendMonitorReportFp)();
S
Shengliang Guan 已提交
59
      lastTime = curTime;
S
shm  
Shengliang Guan 已提交
60
    }
S
shm  
Shengliang Guan 已提交
61
  }
S
Shengliang Guan 已提交
62 63 64 65

  return NULL;
}

S
Shengliang Guan 已提交
66
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
67 68 69 70 71
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
    dError("failed to create status thread since %s", strerror(errno));
S
Shengliang Guan 已提交
72 73 74
    return -1;
  }

S
Shengliang Guan 已提交
75
  taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
76
  tmsgReportStartup("dnode-status", "initialized");
S
Shengliang Guan 已提交
77 78 79
  return 0;
}

S
Shengliang Guan 已提交
80
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
81 82
  if (taosCheckPthreadValid(pMgmt->statusThread)) {
    taosThreadJoin(pMgmt->statusThread, NULL);
83
    taosThreadClear(&pMgmt->statusThread);
S
Shengliang Guan 已提交
84
  }
S
shm  
Shengliang Guan 已提交
85 86
}

S
Shengliang Guan 已提交
87
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
88 89 90 91 92
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  if (taosThreadCreate(&pMgmt->monitorThread, &thAttr, dmMonitorThreadFp, pMgmt) != 0) {
    dError("failed to create monitor thread since %s", strerror(errno));
S
Shengliang Guan 已提交
93 94 95
    return -1;
  }

S
Shengliang Guan 已提交
96
  taosThreadAttrDestroy(&thAttr);
S
Shengliang Guan 已提交
97
  tmsgReportStartup("dnode-monitor", "initialized");
S
Shengliang Guan 已提交
98 99 100
  return 0;
}

S
Shengliang Guan 已提交
101
void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
102 103
  if (taosCheckPthreadValid(pMgmt->monitorThread)) {
    taosThreadJoin(pMgmt->monitorThread, NULL);
104
    taosThreadClear(&pMgmt->monitorThread);
S
Shengliang Guan 已提交
105 106
  }
}
S
Shengliang Guan 已提交
107

S
Shengliang Guan 已提交
108
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
109
  SDnodeMgmt *pMgmt = pInfo->ahandle;
110
  int32_t     code = -1;
H
Hongze Cheng 已提交
111
  STraceId   *trace = &pMsg->info.traceId;
dengyihao's avatar
dengyihao 已提交
112
  dGTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
S
shm  
Shengliang Guan 已提交
113

S
Shengliang Guan 已提交
114
  switch (pMsg->msgType) {
S
shm  
Shengliang Guan 已提交
115
    case TDMT_DND_CONFIG_DNODE:
S
Shengliang Guan 已提交
116
      code = dmProcessConfigReq(pMgmt, pMsg);
S
shm  
Shengliang Guan 已提交
117 118
      break;
    case TDMT_MND_AUTH_RSP:
S
Shengliang Guan 已提交
119
      code = dmProcessAuthRsp(pMgmt, pMsg);
S
shm  
Shengliang Guan 已提交
120 121
      break;
    case TDMT_MND_GRANT_RSP:
S
Shengliang Guan 已提交
122
      code = dmProcessGrantRsp(pMgmt, pMsg);
S
Shengliang Guan 已提交
123 124
      break;
    case TDMT_DND_CREATE_MNODE:
125
      code = (*pMgmt->processCreateNodeFp)(MNODE, pMsg);
S
Shengliang Guan 已提交
126 127
      break;
    case TDMT_DND_DROP_MNODE:
128
      code = (*pMgmt->processDropNodeFp)(MNODE, pMsg);
S
Shengliang Guan 已提交
129 130
      break;
    case TDMT_DND_CREATE_QNODE:
131
      code = (*pMgmt->processCreateNodeFp)(QNODE, pMsg);
S
Shengliang Guan 已提交
132 133
      break;
    case TDMT_DND_DROP_QNODE:
134
      code = (*pMgmt->processDropNodeFp)(QNODE, pMsg);
S
Shengliang Guan 已提交
135 136
      break;
    case TDMT_DND_CREATE_SNODE:
137
      code = (*pMgmt->processCreateNodeFp)(SNODE, pMsg);
S
Shengliang Guan 已提交
138 139
      break;
    case TDMT_DND_DROP_SNODE:
140
      code = (*pMgmt->processDropNodeFp)(SNODE, pMsg);
S
Shengliang Guan 已提交
141
      break;
142 143 144
    case TDMT_DND_SERVER_STATUS:
      code = dmProcessServerRunStatus(pMgmt, pMsg);
      break;
D
dapan1121 已提交
145 146 147
    case TDMT_DND_SYSTABLE_RETRIEVE:
      code = dmProcessRetrieve(pMgmt, pMsg);
      break;
C
Cary Xu 已提交
148 149 150
    case TDMT_MND_GRANT:
      code = dmProcessGrantReq(pMsg);
      break;
S
shm  
Shengliang Guan 已提交
151
    default:
152
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
153
      dGError("msg:%p, not processed in mgmt queue", pMsg);
S
Shengliang Guan 已提交
154
      break;
S
shm  
Shengliang Guan 已提交
155
  }
S
shm  
Shengliang Guan 已提交
156

S
Shengliang Guan 已提交
157
  if (IsReq(pMsg)) {
S
Shengliang Guan 已提交
158 159 160
    if (code != 0 && terrno != 0) code = terrno;
    SRpcMsg rsp = {
        .code = code,
S
Shengliang Guan 已提交
161 162
        .pCont = pMsg->info.rsp,
        .contLen = pMsg->info.rspLen,
163
        .info = pMsg->info,
S
Shengliang Guan 已提交
164
    };
S
shm  
Shengliang Guan 已提交
165 166 167
    rpcSendResponse(&rsp);
  }

S
Shengliang Guan 已提交
168
  dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
S
Shengliang Guan 已提交
169
  rpcFreeCont(pMsg->pCont);
S
shm  
Shengliang Guan 已提交
170
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
171 172
}

S
Shengliang Guan 已提交
173
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
174 175 176 177 178
  SSingleWorkerCfg cfg = {
      .min = 1,
      .max = 1,
      .name = "dnode-mgmt",
      .fp = (FItem)dmProcessMgmtQueue,
S
Shengliang Guan 已提交
179
      .param = pMgmt,
S
Shengliang Guan 已提交
180
  };
S
Shengliang Guan 已提交
181
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
S
Shengliang Guan 已提交
182
    dError("failed to start dnode-mgmt worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
183 184 185
    return -1;
  }

S
Shengliang Guan 已提交
186
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
187 188 189
  return 0;
}

S
Shengliang Guan 已提交
190 191
void dmStopWorker(SDnodeMgmt *pMgmt) {
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
S
Shengliang Guan 已提交
192
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
193
}
S
shm  
Shengliang Guan 已提交
194

S
Shengliang Guan 已提交
195
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
196
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
197
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
198 199
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
200
}