dmWorker.c 4.6 KB
Newer Older
S
shm  
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
shm  
Shengliang Guan 已提交
17
#include "dmInt.h"
S
shm  
Shengliang Guan 已提交
18

S
shm  
Shengliang Guan 已提交
19
static void *dmThreadRoutine(void *param) {
S
shm  
Shengliang Guan 已提交
20
  SDnodeMgmt *pMgmt = param;
S
Shengliang Guan 已提交
21
  SDnode     *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
22 23 24 25 26 27
  int64_t     lastStatusTime = taosGetTimestampMs();
  int64_t     lastMonitorTime = lastStatusTime;

  setThreadName("dnode-hb");

  while (true) {
S
Shengliang Guan 已提交
28
    taosThreadTestCancel();
S
shm  
Shengliang Guan 已提交
29
    taosMsleep(200);
S
shm  
Shengliang Guan 已提交
30
    if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->dropped) {
S
shm  
Shengliang Guan 已提交
31 32 33 34
      continue;
    }

    int64_t curTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
35
    float   statusInterval = (curTime - lastStatusTime) / 1000.0f;
S
shm  
Shengliang Guan 已提交
36
    if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) {
S
shm  
Shengliang Guan 已提交
37
      dmSendStatusReq(pMgmt);
S
shm  
Shengliang Guan 已提交
38 39 40
      lastStatusTime = curTime;
    }

S
shm  
Shengliang Guan 已提交
41 42 43 44 45
    float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
    if (monitorInterval >= tsMonitorInterval) {
      dndSendMonitorReport(pDnode);
      lastMonitorTime = curTime;
    }
S
shm  
Shengliang Guan 已提交
46 47 48
  }
}

S
Shengliang Guan 已提交
49 50 51 52 53 54 55 56 57 58 59
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
  pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt);
  if (pMgmt->threadId == NULL) {
    dError("failed to init dnode thread");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
60 61 62
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
  SDnodeMgmt *pMgmt = pInfo->ahandle;

S
Shengliang Guan 已提交
63
  SDnode  *pDnode = pMgmt->pDnode;
S
shm  
Shengliang Guan 已提交
64
  SRpcMsg *pRpc = &pMsg->rpcMsg;
S
Shengliang Guan 已提交
65
  int32_t  code = -1;
S
shm  
Shengliang Guan 已提交
66
  dTrace("msg:%p, will be processed in dnode queue", pMsg);
S
shm  
Shengliang Guan 已提交
67

S
shm  
Shengliang Guan 已提交
68
  switch (pRpc->msgType) {
S
shm  
Shengliang Guan 已提交
69 70 71 72
    case TDMT_DND_CREATE_MNODE:
    case TDMT_DND_CREATE_QNODE:
    case TDMT_DND_CREATE_SNODE:
    case TDMT_DND_CREATE_BNODE:
S
shm  
Shengliang Guan 已提交
73 74 75
    case TDMT_DND_DROP_MNODE:
    case TDMT_DND_DROP_QNODE:
    case TDMT_DND_DROP_SNODE:
S
shm  
Shengliang Guan 已提交
76
    case TDMT_DND_DROP_BNODE:
S
shm  
Shengliang Guan 已提交
77 78
      code = dndProcessNodeMsg(pMgmt->pDnode, pMsg);
      break;
S
shm  
Shengliang Guan 已提交
79 80 81 82 83 84 85 86 87 88 89 90
    case TDMT_DND_CONFIG_DNODE:
      code = dmProcessConfigReq(pMgmt, pMsg);
      break;
    case TDMT_MND_STATUS_RSP:
      code = dmProcessStatusRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_AUTH_RSP:
      code = dmProcessAuthRsp(pMgmt, pMsg);
      break;
    case TDMT_MND_GRANT_RSP:
      code = dmProcessGrantRsp(pMgmt, pMsg);
      break;
S
shm  
Shengliang Guan 已提交
91 92
    default:
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
S
shm  
Shengliang Guan 已提交
93
      dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType));
S
shm  
Shengliang Guan 已提交
94
  }
S
shm  
Shengliang Guan 已提交
95

S
shm  
Shengliang Guan 已提交
96
  if (pRpc->msgType & 1u) {
S
shm  
Shengliang Guan 已提交
97
    if (code != 0) code = terrno;
S
shm  
Shengliang Guan 已提交
98
    SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
S
shm  
Shengliang Guan 已提交
99 100 101
    rpcSendResponse(&rsp);
  }

S
shm  
Shengliang Guan 已提交
102
  dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
S
shm  
Shengliang Guan 已提交
103 104
  rpcFreeCont(pMsg->rpcMsg.pCont);
  taosFreeQitem(pMsg);
S
shm  
Shengliang Guan 已提交
105 106 107
}

int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
108 109
  SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) {
S
shm  
Shengliang Guan 已提交
110 111 112 113
    dError("failed to start dnode mgmt worker since %s", terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
114 115
  SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
  if (tSingleWorkerInit(&pMgmt->statusWorker, &scfg) != 0) {
S
Shengliang Guan 已提交
116
    dError("failed to start dnode status worker since %s", terrstr());
S
shm  
Shengliang Guan 已提交
117 118 119
    return -1;
  }

S
Shengliang Guan 已提交
120
  dDebug("dnode workers are initialized");
S
shm  
Shengliang Guan 已提交
121 122 123
  return 0;
}

S
shm  
Shengliang Guan 已提交
124
void dmStopWorker(SDnodeMgmt *pMgmt) {
S
Shengliang Guan 已提交
125 126
  tSingleWorkerCleanup(&pMgmt->mgmtWorker);
  tSingleWorkerCleanup(&pMgmt->statusWorker);
S
shm  
Shengliang Guan 已提交
127

S
shm  
Shengliang Guan 已提交
128 129 130
  if (pMgmt->threadId != NULL) {
    taosDestoryThread(pMgmt->threadId);
    pMgmt->threadId = NULL;
S
shm  
Shengliang Guan 已提交
131
  }
S
Shengliang Guan 已提交
132
  dDebug("dnode workers are closed");
S
shm  
Shengliang Guan 已提交
133
}
S
shm  
Shengliang Guan 已提交
134

S
Shengliang Guan 已提交
135
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
S
Shengliang Guan 已提交
136
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
S
Shengliang Guan 已提交
137
  SSingleWorker *pWorker = &pMgmt->mgmtWorker;
S
Shengliang Guan 已提交
138 139 140 141 142 143 144 145 146

  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
}

int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
  SDnodeMgmt    *pMgmt = pWrapper->pMgmt;
  SSingleWorker *pWorker = &pMgmt->statusWorker;
S
shm  
Shengliang Guan 已提交
147

S
Shengliang Guan 已提交
148
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
S
Shengliang Guan 已提交
149 150
  taosWriteQitem(pWorker->queue, pMsg);
  return 0;
S
shm  
Shengliang Guan 已提交
151
}