vnodeMain.c 25.6 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
#include "ihash.h"
#include "taoserror.h"
#include "taosmsg.h"
S
slguan 已提交
21
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22 23 24 25
#include "trpc.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
S
slguan 已提交
26
#include "cJSON.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
#include "twal.h"
S
slguan 已提交
28
#include "tglobal.h"
J
Jeff Tao 已提交
29 30
#include "dnode.h"
#include "vnode.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31
#include "vnodeInt.h"
S
slguan 已提交
32
#include "vnodeLog.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
33

34
static int32_t  tsOpennedVnodes;
J
Jeff Tao 已提交
35 36 37 38 39 40
static void    *tsDnodeVnodesHash;
static void     vnodeCleanUp(SVnodeObj *pVnode);
static void     vnodeBuildVloadMsg(char *pNode, void * param);
static int      vnodeWalCallback(void *arg);
static int32_t  vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t  vnodeReadCfg(SVnodeObj *pVnode);
S
slguan 已提交
41
static int32_t  vnodeSaveVersion(SVnodeObj *pVnode);
S
slguan 已提交
42
static bool     vnodeReadVersion(SVnodeObj *pVnode);
J
Jeff Tao 已提交
43 44 45 46 47
static int      vnodeWalCallback(void *arg);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int      vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void     vnodeNotifyRole(void *ahandle, int8_t role);

48 49
static pthread_once_t  vnodeModuleInit = PTHREAD_ONCE_INIT;

S
slguan 已提交
50
#ifndef _SYNC
J
Jeff Tao 已提交
51
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
52
int     syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; }
S
slguan 已提交
53 54 55
void    syncStop(tsync_h shandle) {}
int     syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int     syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
S
slguan 已提交
56
void    syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
57 58
#endif

59
static void vnodeInit() {
J
Jeff Tao 已提交
60
  vnodeInitWriteFp();
S
slguan 已提交
61
  vnodeInitReadFp();
J
Jeff Tao 已提交
62

63
  tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
J
Jeff Tao 已提交
64 65 66 67
  if (tsDnodeVnodesHash == NULL) {
    dError("failed to init vnode list");
  }
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69 70

int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
  int32_t code;
71
  pthread_once(&vnodeModuleInit, vnodeInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

73
  SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74 75 76
  if (pTemp != NULL) {
    dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
    return TSDB_CODE_SUCCESS;
77
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91

  char rootDir[TSDB_FILENAME_LEN] = {0};
  sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
  if (mkdir(rootDir, 0755) != 0) {
    if (errno == EACCES) {
      return TSDB_CODE_NO_DISK_PERMISSIONS;
    } else if (errno == ENOSPC) {
      return TSDB_CODE_SERV_NO_DISKSPACE;
    } else if (errno == EEXIST) {
    } else {
      return TSDB_CODE_VG_INIT_FAILED;
    }
  }

92 93 94 95 96 97
  code = vnodeSaveCfg(pVnodeCfg);
  if (code != TSDB_CODE_SUCCESS) {
    dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
    return code;
  }

S
slguan 已提交
98 99
  STsdbCfg tsdbCfg = {0};
  tsdbCfg.precision           = pVnodeCfg->cfg.precision;
S
slguan 已提交
100
  tsdbCfg.compression         = pVnodeCfg->cfg.compression;;
S
slguan 已提交
101
  tsdbCfg.tsdbId              = pVnodeCfg->cfg.vgId;
S
slguan 已提交
102
  tsdbCfg.maxTables           = pVnodeCfg->cfg.maxTables;
S
slguan 已提交
103
  tsdbCfg.daysPerFile         = pVnodeCfg->cfg.daysPerFile;
S
slguan 已提交
104 105 106
  tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
  tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
  tsdbCfg.keep                = pVnodeCfg->cfg.daysToKeep;
S
slguan 已提交
107
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109 110
  char tsdbDir[TSDB_FILENAME_LEN] = {0};
  sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
  code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
111
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
112 113
    dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
    return TSDB_CODE_VG_INIT_FAILED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114 115
  }

S
slguan 已提交
116
  dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117 118 119 120 121 122
  code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);

  return code;
}

int32_t vnodeDrop(int32_t vgId) {
123 124
  SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
  if (ppVnode == NULL || *ppVnode == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125 126 127 128
    dTrace("vgId:%d, failed to drop, vgId not exist", vgId);
    return TSDB_CODE_INVALID_VGROUP_ID;
  }

129
  SVnodeObj *pVnode = *ppVnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
  dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
S
slguan 已提交
131
  pVnode->status = TAOS_VN_STATUS_DELETING;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132 133 134 135 136
  vnodeCleanUp(pVnode);
 
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
  SVnodeObj *pVnode = param;
  int32_t code = vnodeSaveCfg(pVnodeCfg);
  if (code != TSDB_CODE_SUCCESS) {
    dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
    return code;
  }

  code = vnodeReadCfg(pVnode);
  if (code != TSDB_CODE_SUCCESS) {
    dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
    taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
    return code;
  }

  code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
  if (code != TSDB_CODE_SUCCESS) {
    dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode, pVnode->vgId,
           tstrerror(code));
    return code;
  }

  code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
  if (code != TSDB_CODE_SUCCESS) {
    dTrace("pVnode:%p vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode, pVnode->vgId,
           tstrerror(code));
    return code;
  }

  dTrace("pVnode:%p vgId:%d, vnode is altered", pVnode, pVnode->vgId);
  return TSDB_CODE_SUCCESS;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170 171
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
  char temp[TSDB_FILENAME_LEN];
172
  pthread_once(&vnodeModuleInit, vnodeInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173

174 175
  SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
  pVnode->vgId     = vnode;
S
slguan 已提交
176
  pVnode->status   = TAOS_VN_STATUS_INIT;
177 178 179
  pVnode->refCount = 1;
  pVnode->version  = 0;  
  taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180

181 182 183 184 185 186 187
  int32_t code = vnodeReadCfg(pVnode);
  if (code != TSDB_CODE_SUCCESS) {
    dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
    taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
    return code;
  }

S
slguan 已提交
188 189
  vnodeReadVersion(pVnode);
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191 192 193
  pVnode->wqueue = dnodeAllocateWqueue(pVnode);
  pVnode->rqueue = dnodeAllocateRqueue(pVnode);

  sprintf(temp, "%s/wal", rootDir);
S
slguan 已提交
194
  pVnode->wal = walOpen(temp, &pVnode->walCfg);
J
Jeff Tao 已提交
195 196 197

  SSyncInfo syncInfo;
  syncInfo.vgId = pVnode->vgId;
J
Jeff Tao 已提交
198
  syncInfo.version = pVnode->version;
J
Jeff Tao 已提交
199 200 201 202 203 204 205 206
  syncInfo.syncCfg = pVnode->syncCfg;
  sprintf(syncInfo.path, "%s/tsdb/", rootDir);
  syncInfo.ahandle = pVnode;
  syncInfo.getWalInfo = vnodeGetWalInfo;
  syncInfo.getFileInfo = vnodeGetFileInfo;
  syncInfo.writeToCache = vnodeWriteToQueue;
  syncInfo.confirmForward = dnodeSendRpcWriteRsp; 
  syncInfo.notifyRole = vnodeNotifyRole;
S
slguan 已提交
207
  pVnode->sync = syncStart(&syncInfo);
J
Jeff Tao 已提交
208

S
slguan 已提交
209 210
  pVnode->events = NULL;
  pVnode->cq     = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211

H
hzcheng 已提交
212 213
  STsdbAppH appH = {0};
  appH.appH = (void *)pVnode;
J
Jeff Tao 已提交
214
  appH.walCallBack = vnodeWalCallback;
H
hzcheng 已提交
215 216 217 218 219 220 221 222 223 224 225

  sprintf(temp, "%s/tsdb", rootDir);
  void *pTsdb = tsdbOpenRepo(temp, &appH);
  if (pTsdb == NULL) {
    dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
    taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
    return terrno;
  }

  pVnode->tsdb = pTsdb;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226 227
  walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);

S
slguan 已提交
228
  pVnode->status = TAOS_VN_STATUS_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229 230
  dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);

J
Jeff Tao 已提交
231
  atomic_add_fetch_32(&tsOpennedVnodes, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232 233 234
  return TSDB_CODE_SUCCESS;
}

235
int32_t vnodeClose(int32_t vgId) {
236 237
  SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
  if (ppVnode == NULL || *ppVnode == NULL) return 0;
238

239
  SVnodeObj *pVnode = *ppVnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240
  dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
S
slguan 已提交
241
  pVnode->status = TAOS_VN_STATUS_CLOSING;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243 244 245 246 247 248
  vnodeCleanUp(pVnode);

  return 0;
}

void vnodeRelease(void *pVnodeRaw) {
  SVnodeObj *pVnode = pVnodeRaw;
249
  int32_t    vgId = pVnode->vgId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251

  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
252
  assert(refCount >= 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253

J
Jeff Tao 已提交
254
  if (refCount > 0) {
255
    dTrace("pVnode:%p vgId:%d, release vnode, refCount:%d", pVnode, vgId, refCount);
J
Jeff Tao 已提交
256 257
    return;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258 259 260 261 262 263 264 265 266

  // remove read queue
  dnodeFreeRqueue(pVnode->rqueue);
  pVnode->rqueue = NULL;

  // remove write queue
  dnodeFreeWqueue(pVnode->wqueue);
  pVnode->wqueue = NULL;

S
slguan 已提交
267
  if (pVnode->status == TAOS_VN_STATUS_DELETING) {
S
slguan 已提交
268 269 270
    char rootDir[TSDB_FILENAME_LEN] = {0};
    sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
    taosRemoveDir(rootDir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272
  }

273
  free(pVnode);
274

J
Jeff Tao 已提交
275
  int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
J
Jeff Tao 已提交
276 277
  dTrace("pVnode:%p vgId:%d, vnode is released, vnodes:%d", pVnode, vgId, count);

J
Jeff Tao 已提交
278
  if (count <= 0) {
279 280
    taosCleanUpIntHash(tsDnodeVnodesHash);
    vnodeModuleInit = PTHREAD_ONCE_INIT;
J
Jeff Tao 已提交
281
    tsDnodeVnodesHash = NULL;
282
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284 285
}

void *vnodeGetVnode(int32_t vgId) {
286 287
  SVnodeObj **ppVnode = (SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
  if (ppVnode == NULL || *ppVnode == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288
    terrno = TSDB_CODE_INVALID_VGROUP_ID;
S
slguan 已提交
289
    dError("vgId:%d not exist", vgId);
S
slguan 已提交
290
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
291 292
  }

293
  return *ppVnode;
S
slguan 已提交
294 295 296 297 298 299
}

void *vnodeAccquireVnode(int32_t vgId) {
  SVnodeObj *pVnode = vnodeGetVnode(vgId);
  if (pVnode == NULL) return pVnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300 301 302 303 304 305 306 307 308 309 310
  atomic_add_fetch_32(&pVnode->refCount, 1);
  dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);

  return pVnode;
}

void *vnodeGetRqueue(void *pVnode) {
  return ((SVnodeObj *)pVnode)->rqueue; 
}

void *vnodeGetWqueue(int32_t vgId) {
S
slguan 已提交
311
  SVnodeObj *pVnode = vnodeAccquireVnode(vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313 314 315 316 317 318 319
  if (pVnode == NULL) return NULL;
  return pVnode->wqueue;
} 

void *vnodeGetWal(void *pVnode) {
  return ((SVnodeObj *)pVnode)->wal; 
}

J
Jeff Tao 已提交
320 321 322 323 324 325
void vnodeBuildStatusMsg(void *param) {
  SDMStatusMsg *pStatus = param;
  taosVisitIntHashWithFp(tsDnodeVnodesHash, vnodeBuildVloadMsg, pStatus);
}

static void vnodeBuildVloadMsg(char *pNode, void * param) {
326
  SVnodeObj *pVnode = *(SVnodeObj **) pNode;
S
slguan 已提交
327
  if (pVnode->status == TAOS_VN_STATUS_DELETING) return;
J
Jeff Tao 已提交
328 329 330 331 332 333

  SDMStatusMsg *pStatus = param;
  if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;

  SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
  pLoad->vgId = htonl(pVnode->vgId);
S
slguan 已提交
334
  pLoad->cfgVersion = htonl(pVnode->cfgVersion);
S
slguan 已提交
335 336 337
  pLoad->totalStorage = htobe64(pLoad->totalStorage);
  pLoad->compStorage = htobe64(pLoad->compStorage);
  pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
J
Jeff Tao 已提交
338
  pLoad->status = pVnode->status;
S
slguan 已提交
339
  pLoad->role = pVnode->role;
S
slguan 已提交
340
  pLoad->replica = pVnode->syncCfg.replica;
J
Jeff Tao 已提交
341 342
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343
static void vnodeCleanUp(SVnodeObj *pVnode) {
S
slguan 已提交
344
  
345 346
  taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347 348 349
  //syncStop(pVnode->sync);
  tsdbCloseRepo(pVnode->tsdb);
  walClose(pVnode->wal);
S
slguan 已提交
350
  vnodeSaveVersion(pVnode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
351 352 353

  vnodeRelease(pVnode);
}
H
hzcheng 已提交
354

355
// TODO: this is a simple implement
J
Jeff Tao 已提交
356
static int vnodeWalCallback(void *arg) {
H
hzcheng 已提交
357 358
  SVnodeObj *pVnode = arg;
  return walRenew(pVnode->wal);
359 360
}

J
Jeff Tao 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
  // SVnodeObj *pVnode = ahandle;
  //tsdbGetFileInfo(pVnode->tsdb, name, index, size);
  return 0;
}

static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
  SVnodeObj *pVnode = ahandle;
  return walGetWalFile(pVnode->wal, name, index);
}

static void vnodeNotifyRole(void *ahandle, int8_t role) {
  SVnodeObj *pVnode = ahandle;
  pVnode->role = role;
}

377
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
S
slguan 已提交
378 379
  char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
380
  FILE *fp = fopen(cfgFile, "w");
S
slguan 已提交
381 382 383 384 385 386 387 388 389 390 391 392
  if (!fp) {
    dError("vgId:%d, failed to open vnode cfg file for write, error:%s", pVnodeCfg->cfg.vgId, strerror(errno));
    return errno;
  }

  char    ipStr[20];
  int32_t len = 0;
  int32_t maxLen = 1000;
  char *  content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");

S
slguan 已提交
393
  len += snprintf(content + len, maxLen - len, "  \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
S
slguan 已提交
394 395
  len += snprintf(content + len, maxLen - len, "  \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
  len += snprintf(content + len, maxLen - len, "  \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
S
slguan 已提交
396 397
  len += snprintf(content + len, maxLen - len, "  \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
  len += snprintf(content + len, maxLen - len, "  \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
S
slguan 已提交
398 399 400
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
S
slguan 已提交
401 402
  len += snprintf(content + len, maxLen - len, "  \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
  len += snprintf(content + len, maxLen - len, "  \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
S
slguan 已提交
403 404 405
  len += snprintf(content + len, maxLen - len, "  \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);  
  len += snprintf(content + len, maxLen - len, "  \"precision\": %d,\n", pVnodeCfg->cfg.precision);
  len += snprintf(content + len, maxLen - len, "  \"compression\": %d,\n", pVnodeCfg->cfg.compression);
S
slguan 已提交
406
  len += snprintf(content + len, maxLen - len, "  \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog);
S
slguan 已提交
407
  len += snprintf(content + len, maxLen - len, "  \"replica\": %d,\n", pVnodeCfg->cfg.replications);
S
slguan 已提交
408
  len += snprintf(content + len, maxLen - len, "  \"wals\": %d,\n", pVnodeCfg->cfg.wals);
S
slguan 已提交
409 410
  len += snprintf(content + len, maxLen - len, "  \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
  
S
slguan 已提交
411 412 413
  uint32_t ipInt =  pVnodeCfg->cfg.arbitratorIp;
  sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
  len += snprintf(content + len, maxLen - len, "  \"arbitratorIp\": \"%s\",\n", ipStr);
414

S
slguan 已提交
415
  len += snprintf(content + len, maxLen - len, "  \"nodeInfos\": [{\n");
416
  for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
S
slguan 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429
    len += snprintf(content + len, maxLen - len, "    \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);

    uint32_t ipInt = pVnodeCfg->nodes[i].nodeIp;
    sprintf(ipStr, "%u.%u.%u.%u", ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24));
    len += snprintf(content + len, maxLen - len, "    \"nodeIp\": \"%s\",\n", ipStr);

    len += snprintf(content + len, maxLen - len, "    \"nodeName\": \"%s\"\n", pVnodeCfg->nodes[i].nodeName);

    if (i < pVnodeCfg->cfg.replications - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
430
  }
S
slguan 已提交
431
  len += snprintf(content + len, maxLen - len, "}\n");
432

S
slguan 已提交
433
  fwrite(content, 1, len, fp);
434
  fclose(fp);
S
slguan 已提交
435
  free(content);
436

S
slguan 已提交
437 438 439
  dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);

  return 0;
440 441 442
}

static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
S
slguan 已提交
443 444
  char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
445
  FILE *fp = fopen(cfgFile, "r");
S
slguan 已提交
446 447 448 449 450 451 452 453 454 455 456 457
  if (!fp) {
    dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, error:%s", pVnode, pVnode->vgId, strerror(errno));
    return errno;
  }

  int   ret = TSDB_CODE_OTHERS;
  int   maxLen = 1000;
  char *content = calloc(1, maxLen + 1);
  int   len = fread(content, 1, maxLen, fp);
  if (len <= 0) {
    free(content);
    fclose(fp);
S
slguan 已提交
458
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, content is null", pVnode, pVnode->vgId);
S
slguan 已提交
459 460 461 462 463
    return false;
  }

  cJSON *root = cJSON_Parse(content);
  if (root == NULL) {
S
slguan 已提交
464
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, invalid json format", pVnode, pVnode->vgId);
S
slguan 已提交
465 466 467
    goto PARSE_OVER;
  }

S
slguan 已提交
468 469 470 471 472 473 474
  cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
  if (!cfgVersion || cfgVersion->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->cfgVersion = cfgVersion->valueint;

S
slguan 已提交
475 476 477
  cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
  if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode, pVnode->vgId);
S
slguan 已提交
478 479
    goto PARSE_OVER;
  }
S
slguan 已提交
480
  pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint;
S
slguan 已提交
481

S
slguan 已提交
482 483 484
  cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
  if (!totalBlocks || totalBlocks->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, totalBlocks not found", pVnode, pVnode->vgId);
S
slguan 已提交
485 486
    goto PARSE_OVER;
  }
S
slguan 已提交
487
  pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
S
slguan 已提交
488 489 490

  cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
  if (!maxTables || maxTables->type != cJSON_Number) {
S
slguan 已提交
491
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxTables not found", pVnode, pVnode->vgId);
S
slguan 已提交
492 493 494 495
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.maxTables = maxTables->valueint;

S
slguan 已提交
496
   cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
S
slguan 已提交
497
  if (!daysPerFile || daysPerFile->type != cJSON_Number) {
S
slguan 已提交
498
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode, pVnode->vgId);
S
slguan 已提交
499 500 501 502
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;

S
slguan 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
  cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
  if (!daysToKeep || daysToKeep->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep = daysToKeep->valueint;

  cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
  if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep1 not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep1 = daysToKeep1->valueint;

  cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
  if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep2 not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep2 = daysToKeep2->valueint;

S
slguan 已提交
524 525
  cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
  if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
S
slguan 已提交
526
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId);
S
slguan 已提交
527 528 529 530 531 532
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;

  cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
  if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
S
slguan 已提交
533
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found", pVnode, pVnode->vgId);
S
slguan 已提交
534 535 536 537
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;

S
slguan 已提交
538 539 540
  cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
  if (!commitTime || commitTime->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, commitTime not found", pVnode, pVnode->vgId);
S
slguan 已提交
541 542
    goto PARSE_OVER;
  }
S
slguan 已提交
543
  pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
S
slguan 已提交
544

S
slguan 已提交
545 546 547 548 549 550 551 552 553 554
  cJSON *precision = cJSON_GetObjectItem(root, "precision");
  if (!precision || precision->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, precision not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.precision = (int8_t)precision->valueint;

  cJSON *compression = cJSON_GetObjectItem(root, "compression");
  if (!compression || compression->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, compression not found", pVnode, pVnode->vgId);
S
slguan 已提交
555 556
    goto PARSE_OVER;
  }
S
slguan 已提交
557
  pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
S
slguan 已提交
558 559 560

  cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog");
  if (!commitLog || commitLog->type != cJSON_Number) {
S
slguan 已提交
561
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, commitLog not found", pVnode, pVnode->vgId);
S
slguan 已提交
562 563 564 565 566 567
    goto PARSE_OVER;
  }
  pVnode->walCfg.commitLog = (int8_t)commitLog->valueint;

  cJSON *wals = cJSON_GetObjectItem(root, "wals");
  if (!wals || wals->type != cJSON_Number) {
S
slguan 已提交
568
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, wals not found", pVnode, pVnode->vgId);
S
slguan 已提交
569 570 571
    goto PARSE_OVER;
  }
  pVnode->walCfg.wals = (int8_t)wals->valueint;
J
Jeff Tao 已提交
572
  pVnode->walCfg.keep = 0;
573

S
slguan 已提交
574 575 576
  cJSON *replica = cJSON_GetObjectItem(root, "replica");
  if (!replica || replica->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, replica not found", pVnode, pVnode->vgId);
S
slguan 已提交
577
    goto PARSE_OVER;
578
  }
S
slguan 已提交
579
  pVnode->syncCfg.replica = (int8_t)replica->valueint;
580

S
slguan 已提交
581 582
  cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
  if (!quorum || quorum->type != cJSON_Number) {
S
slguan 已提交
583
    dError("failed to read vnode cfg, quorum not found", pVnode, pVnode->vgId);
S
slguan 已提交
584 585 586
    goto PARSE_OVER;
  }
  pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
587

S
slguan 已提交
588 589 590
  cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
  if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
S
slguan 已提交
591 592
    goto PARSE_OVER;
  }
S
slguan 已提交
593
  pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
S
slguan 已提交
594 595 596

  cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
  if (!nodeInfos || nodeInfos->type != cJSON_Array) {
S
slguan 已提交
597
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode, pVnode->vgId);
S
slguan 已提交
598 599 600 601 602
    goto PARSE_OVER;
  }

  int size = cJSON_GetArraySize(nodeInfos);
  if (size != pVnode->syncCfg.replica) {
S
slguan 已提交
603
    dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeInfos size not matched", pVnode, pVnode->vgId);
S
slguan 已提交
604 605 606 607 608 609 610 611 612
    goto PARSE_OVER;
  }

  for (int i = 0; i < size; ++i) {
    cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
    if (nodeInfo == NULL) continue;

    cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
    if (!nodeId || nodeId->type != cJSON_Number) {
S
slguan 已提交
613
      dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeId not found", pVnode, pVnode->vgId);
S
slguan 已提交
614 615 616 617 618 619
      goto PARSE_OVER;
    }
    pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;

    cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
    if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
S
slguan 已提交
620
      dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeIp not found", pVnode, pVnode->vgId);
S
slguan 已提交
621 622 623 624 625 626
      goto PARSE_OVER;
    }
    pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring);

    cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
    if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
S
slguan 已提交
627
      dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeName not found", pVnode, pVnode->vgId);
S
slguan 已提交
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
      goto PARSE_OVER;
    }
    strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN);
  }

  ret = 0;

  dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica);
  for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
    dPrint("pVnode:%p vgId:%d, dnode:%d, ip:%s name:%s", pVnode, pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
           taosIpStr(pVnode->syncCfg.nodeInfo[i].nodeIp), pVnode->syncCfg.nodeInfo[i].name);
  }

PARSE_OVER:
  free(content);
  cJSON_Delete(root);
  fclose(fp);
  return ret;
S
slguan 已提交
646
}
S
slguan 已提交
647 648 649 650 651 652 653


static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
  char versionFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
  FILE *fp = fopen(versionFile, "w");
  if (!fp) {
S
slguan 已提交
654
    dError("pVnode:%p vgId:%d, failed to open vnode version file for write, error:%s", pVnode, pVnode->vgId, strerror(errno));
S
slguan 已提交
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    return errno;
  }

  int32_t len = 0;
  int32_t maxLen = 30;
  char *  content = calloc(1, maxLen + 1);

  len += snprintf(content + len, maxLen - len, "{\n");
  len += snprintf(content + len, maxLen - len, "  \"version\": %" PRId64 "\n", pVnode->version);
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  fclose(fp);
  free(content);

  dPrint("pVnode:%p vgId:%d, save vnode version successed", pVnode, pVnode->vgId);

  return 0;
}

S
slguan 已提交
675
static bool vnodeReadVersion(SVnodeObj *pVnode) {
S
slguan 已提交
676 677 678 679
  char versionFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
  FILE *fp = fopen(versionFile, "w");
  if (!fp) {
S
slguan 已提交
680 681
    dError("pVnode:%p vgId:%d, failed to open vnode version file for write, error:%s", pVnode, pVnode->vgId, strerror(errno));
    return false;
S
slguan 已提交
682 683
  }

S
slguan 已提交
684
  bool  ret = false;
S
slguan 已提交
685 686 687 688 689 690
  int   maxLen = 100;
  char *content = calloc(1, maxLen + 1);
  int   len = fread(content, 1, maxLen, fp);
  if (len <= 0) {
    free(content);
    fclose(fp);
S
slguan 已提交
691
    dPrint("pVnode:%p vgId:%d, failed to read vnode version, content is null", pVnode, pVnode->vgId);
S
slguan 已提交
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
    return false;
  }

  cJSON *root = cJSON_Parse(content);
  if (root == NULL) {
    dError("pVnode:%p vgId:%d, failed to read vnode version, invalid json format", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }

  cJSON *version = cJSON_GetObjectItem(root, "version");
  if (!version || version->type != cJSON_Number) {
    dError("pVnode:%p vgId:%d, failed to read vnode version, version not found", pVnode, pVnode->vgId);
    goto PARSE_OVER;
  }
  pVnode->version = version->valueint;

S
slguan 已提交
708
  ret = true;
S
slguan 已提交
709 710 711 712 713 714 715 716 717

  dPrint("pVnode:%p vgId:%d, read vnode version successed, version:%%" PRId64, pVnode, pVnode->vgId, pVnode->version);

PARSE_OVER:
  free(content);
  cJSON_Delete(root);
  fclose(fp);
  return ret;
}