dndDnode.c 22.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "dndDnode.h"
S
Shengliang Guan 已提交
18 19 20 21
#include "dndBnode.h"
#include "dndMnode.h"
#include "dndQnode.h"
#include "dndSnode.h"
S
Shengliang Guan 已提交
22 23 24
#include "dndTransport.h"
#include "dndVnodes.h"

S
Shengliang Guan 已提交
25 26 27 28 29 30 31 32 33 34
static int32_t dndInitMgmtWorker(SDnode *pDnode);
static void    dndCleanupMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMgmtQueue(SDnode *pDnode);
static void    dndFreeMgmtQueue(SDnode *pDnode);
static void    dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);

static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode);
static void   *dnodeThreadRoutine(void *param);

S
Shengliang Guan 已提交
35 36 37 38
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq);
static void    dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void    dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp);
static void    dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp);
S
Shengliang Guan 已提交
39

S
Shengliang Guan 已提交
40
int32_t dndGetDnodeId(SDnode *pDnode) {
S
Shengliang Guan 已提交
41 42 43 44
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);
  int32_t dnodeId = pMgmt->dnodeId;
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
45
  return dnodeId;
S
Shengliang Guan 已提交
46 47
}

S
Shengliang Guan 已提交
48
int64_t dndGetClusterId(SDnode *pDnode) {
S
Shengliang Guan 已提交
49 50
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
51
  int64_t clusterId = pMgmt->clusterId;
S
Shengliang Guan 已提交
52
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
53
  return clusterId;
S
Shengliang Guan 已提交
54 55
}

S
Shengliang Guan 已提交
56
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
S
Shengliang Guan 已提交
57 58
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
59

S
Shengliang Guan 已提交
60
  SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
61 62 63 64 65 66 67 68 69 70
  if (pDnodeEp != NULL) {
    if (pPort != NULL) {
      *pPort = pDnodeEp->port;
    }
    if (pFqdn != NULL) {
      tstrncpy(pFqdn, pDnodeEp->fqdn, TSDB_FQDN_LEN);
    }
    if (pEp != NULL) {
      snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
    }
S
Shengliang Guan 已提交
71
  }
S
Shengliang Guan 已提交
72

S
Shengliang Guan 已提交
73
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
74
}
S
Shengliang Guan 已提交
75

S
Shengliang Guan 已提交
76
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
77 78 79 80
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);
  *pEpSet = pMgmt->mnodeEpSet;
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
81 82
}

S
Shengliang Guan 已提交
83 84
void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
  tmsg_t msgType = pReq->msgType;
S
Shengliang Guan 已提交
85

S
Shengliang Guan 已提交
86
  SEpSet epSet = {0};
S
Shengliang Guan 已提交
87
  dndGetMnodeEpSet(pDnode, &epSet);
S
Shengliang Guan 已提交
88

S
Shengliang Guan 已提交
89
  dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
S
Shengliang Guan 已提交
90
  for (int32_t i = 0; i < epSet.numOfEps; ++i) {
S
Shengliang Guan 已提交
91
    dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
S
Shengliang Guan 已提交
92 93
    if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
      epSet.inUse = (i + 1) % epSet.numOfEps;
S
Shengliang Guan 已提交
94 95 96 97 98
    }

    epSet.port[i] = htons(epSet.port[i]);
  }

S
Shengliang Guan 已提交
99
  rpcSendRedirectRsp(pReq->handle, &epSet);
S
Shengliang Guan 已提交
100 101
}

S
Shengliang Guan 已提交
102
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
S
Shengliang Guan 已提交
103
  dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
S
Shengliang Guan 已提交
104

S
Shengliang Guan 已提交
105 106
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosWLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
107

S
Shengliang Guan 已提交
108
  pMgmt->mnodeEpSet = *pEpSet;
S
Shengliang Guan 已提交
109 110 111 112
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
    dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
  }

S
Shengliang Guan 已提交
113
  taosWUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
114 115
}

S
Shengliang Guan 已提交
116 117
static void dndPrintDnodes(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
118

S
Shengliang Guan 已提交
119 120 121
  dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num);
  for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
    SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i];
S
Shengliang Guan 已提交
122
    dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->fqdn, pEp->port, pEp->isMnode);
S
Shengliang Guan 已提交
123 124 125
  }
}

S
Shengliang Guan 已提交
126 127
static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
128

S
Shengliang Guan 已提交
129
  int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp);
S
Shengliang Guan 已提交
130
  if (pDnodeEps->num > pMgmt->dnodeEps->num) {
S
Shengliang Guan 已提交
131 132 133
    SDnodeEps *tmp = calloc(1, size);
    if (tmp == NULL) return;

S
Shengliang Guan 已提交
134 135
    tfree(pMgmt->dnodeEps);
    pMgmt->dnodeEps = tmp;
S
Shengliang Guan 已提交
136 137
  }

S
Shengliang Guan 已提交
138 139
  if (pMgmt->dnodeEps != pDnodeEps) {
    memcpy(pMgmt->dnodeEps, pDnodeEps, size);
S
Shengliang Guan 已提交
140 141
  }

S
Shengliang Guan 已提交
142
  pMgmt->mnodeEpSet.inUse = 0;
143
  pMgmt->mnodeEpSet.numOfEps = 0;
S
Shengliang Guan 已提交
144 145

  int32_t mIndex = 0;
S
Shengliang Guan 已提交
146 147
  for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
    SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
S
Shengliang Guan 已提交
148
    if (!pDnodeEp->isMnode) continue;
S
Shengliang Guan 已提交
149
    if (mIndex >= TSDB_MAX_REPLICA) continue;
150
    pMgmt->mnodeEpSet.numOfEps++;
S
Shengliang Guan 已提交
151 152
    strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn);
    pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port;
S
Shengliang Guan 已提交
153
    mIndex++;
S
Shengliang Guan 已提交
154 155
  }

S
Shengliang Guan 已提交
156 157 158
  for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
    SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
    taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp));
S
Shengliang Guan 已提交
159 160
  }

S
Shengliang Guan 已提交
161
  dndPrintDnodes(pDnode);
S
Shengliang Guan 已提交
162 163
}

S
Shengliang Guan 已提交
164
static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
S
Shengliang Guan 已提交
165 166
  bool changed = false;

S
Shengliang Guan 已提交
167 168 169 170
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);

  SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
S
Shengliang Guan 已提交
171 172 173
  if (pDnodeEp != NULL) {
    char epstr[TSDB_EP_LEN + 1];
    snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
S
Shengliang Guan 已提交
174
    changed = strcmp(pEp, epstr) != 0;
S
Shengliang Guan 已提交
175 176
  }

S
Shengliang Guan 已提交
177
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
178 179 180
  return changed;
}

S
Shengliang Guan 已提交
181 182
static int32_t dndReadDnodes(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
183

S
Shengliang Guan 已提交
184
  int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
S
Shengliang Guan 已提交
185
  int32_t len = 0;
S
Shengliang Guan 已提交
186
  int32_t maxLen = 256 * 1024;
S
Shengliang Guan 已提交
187 188 189 190
  char   *content = calloc(1, maxLen + 1);
  cJSON  *root = NULL;
  FILE   *fp = NULL;

S
Shengliang Guan 已提交
191 192 193 194
  fp = fopen(pMgmt->file, "r");
  if (fp == NULL) {
    dDebug("file %s not exist", pMgmt->file);
    code = 0;
S
Shengliang Guan 已提交
195
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
196 197 198 199
  }

  len = (int32_t)fread(content, 1, maxLen, fp);
  if (len <= 0) {
S
Shengliang Guan 已提交
200
    dError("failed to read %s since content is null", pMgmt->file);
S
Shengliang Guan 已提交
201
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
202 203 204 205 206
  }

  content[len] = 0;
  root = cJSON_Parse(content);
  if (root == NULL) {
S
Shengliang Guan 已提交
207
    dError("failed to read %s since invalid json format", pMgmt->file);
S
Shengliang Guan 已提交
208
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
209 210 211
  }

  cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
S
Shengliang Guan 已提交
212
  if (!dnodeId || dnodeId->type != cJSON_Number) {
S
Shengliang Guan 已提交
213
    dError("failed to read %s since dnodeId not found", pMgmt->file);
S
Shengliang Guan 已提交
214
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
215
  }
S
Shengliang Guan 已提交
216
  pMgmt->dnodeId = dnodeId->valueint;
S
Shengliang Guan 已提交
217

S
Shengliang Guan 已提交
218
  cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
S
Shengliang Guan 已提交
219
  if (!clusterId || clusterId->type != cJSON_String) {
S
Shengliang Guan 已提交
220
    dError("failed to read %s since clusterId not found", pMgmt->file);
S
Shengliang Guan 已提交
221
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
222
  }
S
Shengliang Guan 已提交
223
  pMgmt->clusterId = atoll(clusterId->valuestring);
S
Shengliang Guan 已提交
224

S
Shengliang Guan 已提交
225
  cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
S
Shengliang Guan 已提交
226
  if (!dropped || dropped->type != cJSON_Number) {
S
Shengliang Guan 已提交
227
    dError("failed to read %s since dropped not found", pMgmt->file);
S
Shengliang Guan 已提交
228
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
229
  }
S
Shengliang Guan 已提交
230
  pMgmt->dropped = dropped->valueint;
S
Shengliang Guan 已提交
231

S
Shengliang Guan 已提交
232 233 234
  cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
  if (!dnodes || dnodes->type != cJSON_Array) {
    dError("failed to read %s since dnodes not found", pMgmt->file);
S
Shengliang Guan 已提交
235
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
236 237
  }

S
Shengliang Guan 已提交
238 239 240
  int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
  if (numOfDnodes <= 0) {
    dError("failed to read %s since numOfDnodes:%d invalid", pMgmt->file, numOfDnodes);
S
Shengliang Guan 已提交
241
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
242 243
  }

S
Shengliang Guan 已提交
244
  pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
S
Shengliang Guan 已提交
245
  if (pMgmt->dnodeEps == NULL) {
S
Shengliang Guan 已提交
246
    dError("failed to calloc dnodeEpList since %s", strerror(errno));
S
Shengliang Guan 已提交
247
    goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
248
  }
S
Shengliang Guan 已提交
249
  pMgmt->dnodeEps->num = numOfDnodes;
S
Shengliang Guan 已提交
250

S
Shengliang Guan 已提交
251
  for (int32_t i = 0; i < numOfDnodes; ++i) {
S
Shengliang Guan 已提交
252 253
    cJSON *node = cJSON_GetArrayItem(dnodes, i);
    if (node == NULL) break;
S
Shengliang Guan 已提交
254

S
Shengliang Guan 已提交
255
    SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
S
Shengliang Guan 已提交
256

S
Shengliang Guan 已提交
257 258
    cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
    if (!dnodeId || dnodeId->type != cJSON_Number) {
S
Shengliang Guan 已提交
259
      dError("failed to read %s since dnodeId not found", pMgmt->file);
S
Shengliang Guan 已提交
260
      goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
261
    }
S
Shengliang Guan 已提交
262
    pDnodeEp->id = dnodeId->valueint;
S
Shengliang Guan 已提交
263

S
Shengliang Guan 已提交
264
    cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
S
Shengliang Guan 已提交
265
    if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
S
Shengliang Guan 已提交
266
      dError("failed to read %s since dnodeFqdn not found", pMgmt->file);
S
Shengliang Guan 已提交
267
      goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
268
    }
S
Shengliang Guan 已提交
269
    tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
S
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271 272
    cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
    if (!dnodePort || dnodePort->type != cJSON_Number) {
S
Shengliang Guan 已提交
273
      dError("failed to read %s since dnodePort not found", pMgmt->file);
S
Shengliang Guan 已提交
274
      goto PRASE_DNODE_OVER;
S
Shengliang Guan 已提交
275
    }
S
Shengliang Guan 已提交
276 277 278 279
    pDnodeEp->port = dnodePort->valueint;

    cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
    if (!isMnode || isMnode->type != cJSON_Number) {
S
Shengliang Guan 已提交
280
      dError("failed to read %s since isMnode not found", pMgmt->file);
S
Shengliang Guan 已提交
281 282 283
      goto PRASE_DNODE_OVER;
    }
    pDnodeEp->isMnode = isMnode->valueint;
S
Shengliang Guan 已提交
284 285
  }

S
Shengliang Guan 已提交
286 287 288
  code = 0;
  dInfo("succcessed to read file %s", pMgmt->file);
  dndPrintDnodes(pDnode);
S
Shengliang Guan 已提交
289

S
Shengliang Guan 已提交
290
PRASE_DNODE_OVER:
S
Shengliang Guan 已提交
291 292 293 294
  if (content != NULL) free(content);
  if (root != NULL) cJSON_Delete(root);
  if (fp != NULL) fclose(fp);

S
Shengliang Guan 已提交
295 296
  if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) {
    dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file);
S
Shengliang Guan 已提交
297 298 299
    return -1;
  }

S
Shengliang Guan 已提交
300 301 302
  if (pMgmt->dnodeEps == NULL) {
    pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
    pMgmt->dnodeEps->num = 1;
S
Shengliang Guan 已提交
303
    pMgmt->dnodeEps->eps[0].isMnode = 1;
S
Shengliang Guan 已提交
304
    taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
S
Shengliang Guan 已提交
305 306
  }

S
Shengliang Guan 已提交
307
  dndResetDnodes(pDnode, pMgmt->dnodeEps);
S
Shengliang Guan 已提交
308 309 310 311 312

  terrno = 0;
  return 0;
}

S
Shengliang Guan 已提交
313 314
static int32_t dndWriteDnodes(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
315

S
Shengliang Guan 已提交
316 317 318 319
  FILE *fp = fopen(pMgmt->file, "w");
  if (fp == NULL) {
    dError("failed to write %s since %s", pMgmt->file, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
S
Shengliang Guan 已提交
320 321 322 323
    return -1;
  }

  int32_t len = 0;
S
Shengliang Guan 已提交
324
  int32_t maxLen = 256 * 1024;
S
Shengliang Guan 已提交
325 326 327
  char   *content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
S
Shengliang Guan 已提交
328
  len += snprintf(content + len, maxLen - len, "  \"dnodeId\": %d,\n", pMgmt->dnodeId);
329
  len += snprintf(content + len, maxLen - len, "  \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
S
Shengliang Guan 已提交
330 331
  len += snprintf(content + len, maxLen - len, "  \"dropped\": %d,\n", pMgmt->dropped);
  len += snprintf(content + len, maxLen - len, "  \"dnodes\": [{\n");
S
Shengliang Guan 已提交
332 333
  for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
    SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
S
Shengliang Guan 已提交
334 335 336 337
    len += snprintf(content + len, maxLen - len, "    \"id\": %d,\n", pDnodeEp->id);
    len += snprintf(content + len, maxLen - len, "    \"fqdn\": \"%s\",\n", pDnodeEp->fqdn);
    len += snprintf(content + len, maxLen - len, "    \"port\": %u,\n", pDnodeEp->port);
    len += snprintf(content + len, maxLen - len, "    \"isMnode\": %d\n", pDnodeEp->isMnode);
S
Shengliang Guan 已提交
338
    if (i < pMgmt->dnodeEps->num - 1) {
S
Shengliang Guan 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
  }
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  taosFsyncFile(fileno(fp));
  fclose(fp);
  free(content);
  terrno = 0;

S
Shengliang Guan 已提交
352
  pMgmt->updateTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
353
  dDebug("successed to write %s", pMgmt->file);
S
Shengliang Guan 已提交
354 355 356
  return 0;
}

S
Shengliang Guan 已提交
357
void dndSendStatusReq(SDnode *pDnode) {
S
Shengliang Guan 已提交
358
  int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
S
Shengliang Guan 已提交
359

S
Shengliang Guan 已提交
360
  SStatusReq *pStatus = rpcMallocCont(contLen);
S
Shengliang Guan 已提交
361 362 363 364 365
  if (pStatus == NULL) {
    dError("failed to malloc status message");
    return;
  }

S
Shengliang Guan 已提交
366 367
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosRLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
368
  pStatus->sver = htonl(pDnode->opt.sver);
369
  pStatus->dver = htobe64(pMgmt->dver);
S
Shengliang Guan 已提交
370
  pStatus->dnodeId = htonl(pMgmt->dnodeId);
371
  pStatus->clusterId = htobe64(pMgmt->clusterId);
372
  pStatus->rebootTime = htobe64(pMgmt->rebootTime);
S
Shengliang Guan 已提交
373
  pStatus->updateTime = htobe64(pMgmt->updateTime);
S
Shengliang Guan 已提交
374 375
  pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
  pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes);
S
Shengliang Guan 已提交
376
  tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
S
Shengliang Guan 已提交
377

S
Shengliang Guan 已提交
378
  pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
S
Shengliang Guan 已提交
379
  pStatus->clusterCfg.checkTime = 0;
S
Shengliang Guan 已提交
380 381
  char timestr[32] = "1970-01-01 00:00:00.00";
  (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
S
Shengliang Guan 已提交
382 383 384 385
  pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime);
  tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN);
  tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN);
  tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN);
S
Shengliang Guan 已提交
386
  taosRUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
387

S
Shengliang Guan 已提交
388
  dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
S
Shengliang Guan 已提交
389
  contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
S
Shengliang Guan 已提交
390

S
Shengliang Guan 已提交
391
  SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
392
  pMgmt->statusSent = 1;
S
Shengliang Guan 已提交
393

S
Shengliang Guan 已提交
394
  dTrace("pDnode:%p, send status req to mnode", pDnode);
S
Shengliang Guan 已提交
395
  dndSendReqToMnode(pDnode, &rpcMsg);
S
Shengliang Guan 已提交
396 397
}

S
Shengliang Guan 已提交
398 399
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
400
  if (pMgmt->dnodeId == 0) {
S
Shengliang Guan 已提交
401
    dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
S
Shengliang Guan 已提交
402
    taosWLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
403 404
    pMgmt->dnodeId = pCfg->dnodeId;
    pMgmt->clusterId = pCfg->clusterId;
405
    dndWriteDnodes(pDnode);
S
Shengliang Guan 已提交
406
    taosWUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
407
  }
S
Shengliang Guan 已提交
408 409
}

S
Shengliang Guan 已提交
410
static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
S
Shengliang Guan 已提交
411
  if (pDnodeEps == NULL || pDnodeEps->num <= 0) return;
S
Shengliang Guan 已提交
412

S
Shengliang Guan 已提交
413 414
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  taosWLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
415

S
Shengliang Guan 已提交
416
  if (pDnodeEps->num != pMgmt->dnodeEps->num) {
S
Shengliang Guan 已提交
417 418
    dndResetDnodes(pDnode, pDnodeEps);
    dndWriteDnodes(pDnode);
S
Shengliang Guan 已提交
419
  } else {
S
Shengliang Guan 已提交
420
    int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps);
S
Shengliang Guan 已提交
421
    if (memcmp(pMgmt->dnodeEps, pDnodeEps, size) != 0) {
S
Shengliang Guan 已提交
422 423
      dndResetDnodes(pDnode, pDnodeEps);
      dndWriteDnodes(pDnode);
S
Shengliang Guan 已提交
424 425 426
    }
  }

S
Shengliang Guan 已提交
427
  taosWUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
428 429
}

S
Shengliang Guan 已提交
430
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
431 432
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;

S
Shengliang Guan 已提交
433
  if (pRsp->code != TSDB_CODE_SUCCESS) {
434
    pMgmt->statusSent = 0;
S
Shengliang Guan 已提交
435
    if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) {
S
Shengliang Guan 已提交
436 437 438 439
      dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId);
      pMgmt->dropped = 1;
      dndWriteDnodes(pDnode);
    }
440 441
    return;
  }
S
Shengliang Guan 已提交
442

S
Shengliang Guan 已提交
443 444 445
  if (pRsp->pCont != NULL && pRsp->contLen != 0) {
    SStatusRsp *pStatus = pRsp->pCont;
    pMgmt->dver = htobe64(pStatus->dver);
446

S
Shengliang Guan 已提交
447
    SDnodeCfg *pCfg = &pStatus->dnodeCfg;
S
Shengliang Guan 已提交
448 449 450 451
    pCfg->dnodeId = htonl(pCfg->dnodeId);
    pCfg->clusterId = htobe64(pCfg->clusterId);
    dndUpdateDnodeCfg(pDnode, pCfg);

S
Shengliang Guan 已提交
452
    SDnodeEps *pDnodeEps = &pStatus->dnodeEps;
S
Shengliang Guan 已提交
453 454 455 456 457
    pDnodeEps->num = htonl(pDnodeEps->num);
    for (int32_t i = 0; i < pDnodeEps->num; ++i) {
      pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
      pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
    }
S
Shengliang Guan 已提交
458

S
Shengliang Guan 已提交
459 460
    dndUpdateDnodeEps(pDnode, pDnodeEps);
  }
461
  pMgmt->statusSent = 0;
S
Shengliang Guan 已提交
462
}
S
Shengliang Guan 已提交
463

S
Shengliang Guan 已提交
464
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); }
S
Shengliang Guan 已提交
465

S
Shengliang Guan 已提交
466 467 468
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) {
  dError("grant rsp is received, but not supported yet");
}
S
Shengliang Guan 已提交
469

S
Shengliang Guan 已提交
470 471 472
static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
  dError("config req is received, but not supported yet");
  SDCfgDnodeReq *pCfg = pReq->pCont;
S
Shengliang Guan 已提交
473
  return TSDB_CODE_OPS_NOT_SUPPORT;
S
Shengliang Guan 已提交
474 475
}

S
Shengliang Guan 已提交
476 477
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
  dDebug("startup req is received");
S
Shengliang Guan 已提交
478

S
Shengliang Guan 已提交
479
  SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
S
Shengliang Guan 已提交
480
  dndGetStartup(pDnode, pStartup);
S
Shengliang Guan 已提交
481

S
Shengliang Guan 已提交
482
  dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
S
Shengliang Guan 已提交
483

S
Shengliang Guan 已提交
484
  SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
S
Shengliang Guan 已提交
485
  rpcSendResponse(&rpcRsp);
S
Shengliang Guan 已提交
486 487 488
}

static void *dnodeThreadRoutine(void *param) {
489 490 491
  SDnode     *pDnode = param;
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  int32_t     ms = pDnode->opt.statusInterval * 1000;
S
Shengliang Guan 已提交
492

S
Shengliang Guan 已提交
493 494
  while (true) {
    pthread_testcancel();
S
Shengliang Guan 已提交
495
    taosMsleep(ms);
S
Shengliang Guan 已提交
496

S
Shengliang Guan 已提交
497
    if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) {
S
Shengliang Guan 已提交
498
      dndSendStatusReq(pDnode);
S
Shengliang Guan 已提交
499
    }
S
Shengliang Guan 已提交
500 501 502
  }
}

S
Shengliang Guan 已提交
503 504
int32_t dndInitDnode(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
505

S
Shengliang Guan 已提交
506
  pMgmt->dnodeId = 0;
507
  pMgmt->rebootTime = taosGetTimestampMs();
S
Shengliang Guan 已提交
508 509
  pMgmt->dropped = 0;
  pMgmt->clusterId = 0;
S
Shengliang Guan 已提交
510
  taosInitRWLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
511 512

  char path[PATH_MAX];
S
Shengliang Guan 已提交
513 514 515
  snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode);
  pMgmt->file = strdup(path);
  if (pMgmt->file == NULL) {
S
Shengliang Guan 已提交
516 517 518 519
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

520
  pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
S
Shengliang Guan 已提交
521
  if (pMgmt->dnodeHash == NULL) {
S
Shengliang Guan 已提交
522
    dError("failed to init dnode hash");
S
Shengliang Guan 已提交
523 524
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
525 526
  }

S
Shengliang Guan 已提交
527 528
  if (dndReadDnodes(pDnode) != 0) {
    dError("failed to read file:%s since %s", pMgmt->file, terrstr());
S
Shengliang Guan 已提交
529
    return -1;
S
Shengliang Guan 已提交
530 531
  }

S
Shengliang Guan 已提交
532 533 534 535 536
  if (pMgmt->dropped) {
    dError("dnode will not start for its already dropped");
    return -1;
  }

S
Shengliang Guan 已提交
537 538 539 540 541 542 543 544 545
  if (dndInitMgmtWorker(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (dndAllocMgmtQueue(pDnode) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
S
Shengliang Guan 已提交
546

S
Shengliang Guan 已提交
547 548
  pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
  if (pMgmt->threadId == NULL) {
S
Shengliang Guan 已提交
549 550 551
    dError("failed to init dnode thread");
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
552 553
  }

S
Shengliang Guan 已提交
554
  dInfo("dnode-dnode is initialized");
S
Shengliang Guan 已提交
555 556 557
  return 0;
}

S
Shengliang Guan 已提交
558 559
void dndCleanupDnode(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
S
Shengliang Guan 已提交
560

S
Shengliang Guan 已提交
561 562 563
  dndCleanupMgmtWorker(pDnode);
  dndFreeMgmtQueue(pDnode);

S
Shengliang Guan 已提交
564 565 566
  if (pMgmt->threadId != NULL) {
    taosDestoryThread(pMgmt->threadId);
    pMgmt->threadId = NULL;
S
Shengliang Guan 已提交
567 568
  }

S
Shengliang Guan 已提交
569
  taosWLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
570

S
Shengliang Guan 已提交
571 572 573
  if (pMgmt->dnodeEps != NULL) {
    free(pMgmt->dnodeEps);
    pMgmt->dnodeEps = NULL;
S
Shengliang Guan 已提交
574
  }
S
Shengliang Guan 已提交
575

S
Shengliang Guan 已提交
576 577 578
  if (pMgmt->dnodeHash != NULL) {
    taosHashCleanup(pMgmt->dnodeHash);
    pMgmt->dnodeHash = NULL;
S
Shengliang Guan 已提交
579
  }
S
Shengliang Guan 已提交
580

S
Shengliang Guan 已提交
581 582 583
  if (pMgmt->file != NULL) {
    free(pMgmt->file);
    pMgmt->file = NULL;
S
Shengliang Guan 已提交
584 585
  }

S
Shengliang Guan 已提交
586
  taosWUnLockLatch(&pMgmt->latch);
S
Shengliang Guan 已提交
587
  dInfo("dnode-dnode is cleaned up");
S
Shengliang Guan 已提交
588 589
}

S
Shengliang Guan 已提交
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
static int32_t dndInitMgmtWorker(SDnode *pDnode) {
  SDnodeMgmt  *pMgmt = &pDnode->dmgmt;
  SWorkerPool *pPool = &pMgmt->mgmtPool;
  pPool->name = "dnode-mgmt";
  pPool->min = 1;
  pPool->max = 1;
  if (tWorkerInit(pPool) != 0) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  dDebug("dnode mgmt worker is initialized");
  return 0;
}

static void dndCleanupMgmtWorker(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  tWorkerCleanup(&pMgmt->mgmtPool);
  dDebug("dnode mgmt worker is closed");
}

static int32_t dndAllocMgmtQueue(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue);
  if (pMgmt->pMgmtQ == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  return 0;
}

static void dndFreeMgmtQueue(SDnode *pDnode) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;
  tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
  pMgmt->pMgmtQ = NULL;
}

void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
  SDnodeMgmt *pMgmt = &pDnode->dmgmt;

  if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) {
    dndUpdateMnodeEpSet(pDnode, pEpSet);
  }

  SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
  if (pMsg != NULL) *pMsg = *pRpcMsg;

  if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
    if (pRpcMsg->msgType & 1u) {
      SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
      rpcSendResponse(&rsp);
    }
    rpcFreeCont(pRpcMsg->pCont);
    taosFreeQitem(pMsg);
  }
}

static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
  int32_t code = 0;

S
Shengliang Guan 已提交
650
  switch (pMsg->msgType) {
S
Shengliang Guan 已提交
651 652 653 654 655 656 657 658 659
    case TDMT_DND_CREATE_MNODE:
      code = dndProcessCreateMnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_ALTER_MNODE:
      code = dndProcessAlterMnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_DROP_MNODE:
      code = dndProcessDropMnodeReq(pDnode, pMsg);
      break;
S
Shengliang Guan 已提交
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
    case TDMT_DND_CREATE_QNODE:
      code = dndProcessCreateQnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_DROP_QNODE:
      code = dndProcessDropQnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_CREATE_SNODE:
      code = dndProcessCreateSnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_DROP_SNODE:
      code = dndProcessDropSnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_CREATE_BNODE:
      code = dndProcessCreateBnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_DROP_BNODE:
      code = dndProcessDropBnodeReq(pDnode, pMsg);
      break;
H
Hongze Cheng 已提交
678
    case TDMT_DND_CONFIG_DNODE:
S
Shengliang Guan 已提交
679
      code = dndProcessConfigDnodeReq(pDnode, pMsg);
S
Shengliang Guan 已提交
680
      break;
H
Hongze Cheng 已提交
681
    case TDMT_MND_STATUS_RSP:
S
Shengliang Guan 已提交
682
      dndProcessStatusRsp(pDnode, pMsg);
S
Shengliang Guan 已提交
683
      break;
H
Hongze Cheng 已提交
684
    case TDMT_MND_AUTH_RSP:
S
Shengliang Guan 已提交
685
      dndProcessAuthRsp(pDnode, pMsg);
S
Shengliang Guan 已提交
686
      break;
H
Hongze Cheng 已提交
687
    case TDMT_MND_GRANT_RSP:
S
Shengliang Guan 已提交
688
      dndProcessGrantRsp(pDnode, pMsg);
S
Shengliang Guan 已提交
689
      break;
S
Shengliang Guan 已提交
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
    case TDMT_DND_CREATE_VNODE:
      code = dndProcessCreateVnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_ALTER_VNODE:
      code = dndProcessAlterVnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_DROP_VNODE:
      code = dndProcessDropVnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_AUTH_VNODE:
      code = dndProcessAuthVnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_SYNC_VNODE:
      code = dndProcessSyncVnodeReq(pDnode, pMsg);
      break;
    case TDMT_DND_COMPACT_VNODE:
      code = dndProcessCompactVnodeReq(pDnode, pMsg);
      break;
S
Shengliang Guan 已提交
708
    default:
S
Shengliang Guan 已提交
709 710
      terrno = TSDB_CODE_MSG_NOT_PROCESSED;
      code = -1;
S
Shengliang Guan 已提交
711
      dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
S
Shengliang Guan 已提交
712 713 714 715 716 717 718
      break;
  }

  if (pMsg->msgType & 1u) {
    if (code != 0) code = terrno;
    SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
    rpcSendResponse(&rsp);
S
Shengliang Guan 已提交
719
  }
S
Shengliang Guan 已提交
720

S
Shengliang Guan 已提交
721
  rpcFreeCont(pMsg->pCont);
S
Shengliang Guan 已提交
722
  pMsg->pCont = NULL;
S
Shengliang Guan 已提交
723
  taosFreeQitem(pMsg);
S
Shengliang Guan 已提交
724
}