dmWorker.c 4.2 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static void *dmThreadRoutine(void *param) {
S
shm  
Shengliang Guan 已提交
20
  SDnodeMgmt *pMgmt = param;
S
Shengliang Guan 已提交
21
  SDnode     *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
22 23 24 25 26 27
  int64_t     lastStatusTime = taosGetTimestampMs();
  int64_t     lastMonitorTime = lastStatusTime;

  setThreadName("dnode-hb");

  while (true) {
S
Shengliang Guan 已提交
28
    taosThreadTestCancel();
S
shm  
Shengliang Guan 已提交
29
    taosMsleep(200);
S
shm  
Shengliang Guan 已提交
30
    if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) {
S
shm  
Shengliang Guan 已提交
31 32 33 34
      continue;
    }

    int64_t curTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
35
    float   statusInterval = (curTime - lastStatusTime) / 1000.0f;
S
shm  
Shengliang Guan 已提交
36
    if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
S
shm  
Shengliang Guan 已提交
37
      dmSendStatusReq(pMgmt);
S
shm  
Shengliang Guan 已提交
38 39 40
      lastStatusTime = curTime;
    }

S
shm  
Shengliang Guan 已提交
41 42 43 44 45
    float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
    if (monitorInterval >= tsMonitorInterval) {
      dndSendMonitorReport(pDnode);
      lastMonitorTime = curTime;
    }
S
shm  
Shengliang Guan 已提交
46 47 48
  }
}

S
Shengliang Guan 已提交
49 50 51 52 53 54 55 56 57 58 59
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
  pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
  if (pMgmt->threadId == NULL) {
    dError("failed to init dnode thread");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
60 61 62
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SDnodeMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
63
  SDnode  *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
64
  SRpcMsg *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
65
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
66
  dTrace("msg:%p, will be processed in dnode queue", pMsg);
S
shm  
Shengliang Guan 已提交
67

S
shm  
Shengliang Guan 已提交
68
  switch (pRpc->msgType) {
S
shm  
Shengliang Guan 已提交
69 70 71 72 73 74 75 76 77 78 79 80
    case TDMT_DND_CONFIG_DNODE:
      code = dmProcessConfigReq(pMgmt, pMsg);
      break;
    case TDMT_MND_STATUS_RSP:
      code = dmProcessStatusRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_AUTH_RSP:
      code = dmProcessAuthRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_GRANT_RSP:
      code = dmProcessGrantRsp(pMgmt, pMsg);
      break;
S
shm  
Shengliang Guan 已提交
81
    default:
S
Shengliang Guan 已提交
82 83
      code = dmProcessCDnodeMsg(pMgmt->pDnode, pMsg);
      break;
S
shm  
Shengliang Guan 已提交
84
  }
S
shm  
Shengliang Guan 已提交
85

S
shm  
Shengliang Guan 已提交
86
  if (pRpc->msgType & 1u) {
S
shm  
Shengliang Guan 已提交
87
    if (code != 0) code = terrno;
S
shm  
Shengliang Guan 已提交
88
    SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
shm  
Shengliang Guan 已提交
89 90 91
    rpcSendResponse(&rsp);
  }

S
shm  
Shengliang Guan 已提交
92
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
93 94
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
95 96 97
}

int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
98 99
  SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) {
S
shm  
Shengliang Guan 已提交
100 101 102 103
    dError("failed to start dnode mgmt worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
104 105
  SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->statusWorker, &scfg) != 0) {
S
Shengliang Guan 已提交
106
    dError("failed to start dnode status worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
107 108 109
    return -1;
  }

S
Shengliang Guan 已提交
110
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
111 112 113
  return 0;
}

S
shm  
Shengliang Guan 已提交
114
void dmStopWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
115 116
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  tSingleWorkerCleanup(&pMgmt->statusWorker);
S
shm  
Shengliang Guan 已提交
117

S
shm  
Shengliang Guan 已提交
118 119 120
  if (pMgmt->threadId != NULL) {
    taosDestoryThread(pMgmt->threadId);
    pMgmt->threadId = NULL;
S
shm  
Shengliang Guan 已提交
121
  }
S
Shengliang Guan 已提交
122
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
123
}
S
shm  
Shengliang Guan 已提交
124

S
Shengliang Guan 已提交
125
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
126
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
127
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136

  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
  SSingleWorker *pWorker = &pMgmt->statusWorker;
S
shm  
Shengliang Guan 已提交
137

S
Shengliang Guan 已提交
138
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
139 140
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
141
}