vnodeMain.c 28.4 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
S
Shengliang Guan 已提交
18
#include "hash.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19 20
#include "taoserror.h"
#include "taosmsg.h"
S
slguan 已提交
21
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22 23 24 25
#include "trpc.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
S
slguan 已提交
26
#include "cJSON.h"
S
slguan 已提交
27
#include "tglobal.h"
J
Jeff Tao 已提交
28 29
#include "dnode.h"
#include "vnode.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30 31
#include "vnodeInt.h"

32 33
#define TSDB_VNODE_VERSION_CONTENT_LEN 31

34
static int32_t  tsOpennedVnodes;
J
Jeff Tao 已提交
35 36 37 38
static void    *tsDnodeVnodesHash;
static void     vnodeCleanUp(SVnodeObj *pVnode);
static int32_t  vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t  vnodeReadCfg(SVnodeObj *pVnode);
S
slguan 已提交
39
static int32_t  vnodeSaveVersion(SVnodeObj *pVnode);
40
static int32_t  vnodeReadVersion(SVnodeObj *pVnode);
41
static int      vnodeProcessTsdbStatus(void *arg, int status);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
J
Jeff Tao 已提交
43 44
static int      vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void     vnodeNotifyRole(void *ahandle, int8_t role);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45
static void     vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
J
Jeff Tao 已提交
46

47 48
static pthread_once_t  vnodeModuleInit = PTHREAD_ONCE_INIT;

S
slguan 已提交
49
#ifndef _SYNC
J
Jeff Tao 已提交
50
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
51
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
S
slguan 已提交
52
void    syncStop(tsync_h shandle) {}
53
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
S
slguan 已提交
54
int     syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
S
slguan 已提交
55
void    syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
56 57
#endif

58
static void vnodeInit() {
J
Jeff Tao 已提交
59
  vnodeInitWriteFp();
S
slguan 已提交
60
  vnodeInitReadFp();
J
Jeff Tao 已提交
61

S
Shengliang Guan 已提交
62
  tsDnodeVnodesHash = taosHashInit(TSDB_MAX_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
J
Jeff Tao 已提交
63
  if (tsDnodeVnodesHash == NULL) {
64
    vError("failed to init vnode list");
J
Jeff Tao 已提交
65 66
  }
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68 69

int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
  int32_t code;
70
  pthread_once(&vnodeModuleInit, vnodeInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71

S
Shengliang Guan 已提交
72
  SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73
  if (pTemp != NULL) {
74
    vPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
    return TSDB_CODE_SUCCESS;
76
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77

78 79 80 81 82 83 84 85 86 87 88 89
  if (mkdir(tsVnodeDir, 0755) != 0 && errno != EEXIST) {
    vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), tsVnodeDir);
    if (errno == EACCES) {
      return TSDB_CODE_VND_NO_DISK_PERMISSIONS;
    } else if (errno == ENOSPC) {
      return TSDB_CODE_VND_NO_DISKSPACE;
    } else if (errno == ENOENT) {
      return TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR;
    } else {
      return TSDB_CODE_VND_INIT_FAILED;
    }
  }
90

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92
  char rootDir[TSDB_FILENAME_LEN] = {0};
  sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
93 94
  if (mkdir(rootDir, 0755) != 0 && errno != EEXIST) {
    vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), rootDir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95
    if (errno == EACCES) {
96
      return TSDB_CODE_VND_NO_DISK_PERMISSIONS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97
    } else if (errno == ENOSPC) {
98
      return TSDB_CODE_VND_NO_DISKSPACE;
99
    } else if (errno == ENOENT) {
100
      return TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101
    } else {
102
      return TSDB_CODE_VND_INIT_FAILED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103 104 105
    }
  }

106 107
  code = vnodeSaveCfg(pVnodeCfg);
  if (code != TSDB_CODE_SUCCESS) {
108
    vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
109 110 111
    return code;
  }

S
slguan 已提交
112 113
  STsdbCfg tsdbCfg = {0};
  tsdbCfg.tsdbId              = pVnodeCfg->cfg.vgId;
H
TD-183  
hzcheng 已提交
114 115
  tsdbCfg.cacheBlockSize      = pVnodeCfg->cfg.cacheBlockSize;
  tsdbCfg.totalBlocks         = pVnodeCfg->cfg.totalBlocks;
S
slguan 已提交
116
  tsdbCfg.maxTables           = pVnodeCfg->cfg.maxTables;
S
slguan 已提交
117
  tsdbCfg.daysPerFile         = pVnodeCfg->cfg.daysPerFile;
H
TD-183  
hzcheng 已提交
118
  tsdbCfg.keep                = pVnodeCfg->cfg.daysToKeep;
S
slguan 已提交
119 120
  tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
  tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
H
TD-183  
hzcheng 已提交
121 122
  tsdbCfg.precision           = pVnodeCfg->cfg.precision;
  tsdbCfg.compression         = pVnodeCfg->cfg.compression;;
Y
yifan hao 已提交
123

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124 125 126
  char tsdbDir[TSDB_FILENAME_LEN] = {0};
  sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
  code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
127
  if (code != TSDB_CODE_SUCCESS) {
128
    vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
129
    return TSDB_CODE_VND_INIT_FAILED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131
  }

132
  vPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133 134 135 136 137 138
  code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);

  return code;
}

int32_t vnodeDrop(int32_t vgId) {
S
Shengliang Guan 已提交
139 140
  if (tsDnodeVnodesHash == NULL) {
    vTrace("vgId:%d, failed to drop, vgId not exist", vgId);
141
    return TSDB_CODE_VND_INVALID_VGROUP_ID;
S
Shengliang Guan 已提交
142 143
  }

S
Shengliang Guan 已提交
144
  SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
145
  if (ppVnode == NULL || *ppVnode == NULL) {
S
Shengliang Guan 已提交
146
    vTrace("vgId:%d, failed to drop, vgId not find", vgId);
147
    return TSDB_CODE_VND_INVALID_VGROUP_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148 149
  }

150
  SVnodeObj *pVnode = *ppVnode;
151
  vTrace("vgId:%d, vnode will be dropped", pVnode->vgId);
S
slguan 已提交
152
  pVnode->status = TAOS_VN_STATUS_DELETING;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
153
  vnodeCleanUp(pVnode);
Y
yifan hao 已提交
154

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157
  return TSDB_CODE_SUCCESS;
}

S
slguan 已提交
158 159
int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
  SVnodeObj *pVnode = param;
160 161
  pVnode->status = TAOS_VN_STATUS_UPDATING;

S
slguan 已提交
162
  int32_t code = vnodeSaveCfg(pVnodeCfg);
163
  if (code != TSDB_CODE_SUCCESS) return code; 
S
slguan 已提交
164 165

  code = vnodeReadCfg(pVnode);
166
  if (code != TSDB_CODE_SUCCESS) return code; 
S
slguan 已提交
167 168

  code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
169
  if (code != TSDB_CODE_SUCCESS) return code; 
S
slguan 已提交
170 171

  code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
172
  if (code != TSDB_CODE_SUCCESS) return code; 
S
slguan 已提交
173

174
  pVnode->status = TAOS_VN_STATUS_READY;
175
  vTrace("vgId:%d, vnode is altered", pVnode->vgId);
176

S
slguan 已提交
177 178 179
  return TSDB_CODE_SUCCESS;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180 181
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
  char temp[TSDB_FILENAME_LEN];
182
  pthread_once(&vnodeModuleInit, vnodeInit);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183

184
  SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
185 186 187 188 189 190 191 192
  if (pVnode == NULL) {
    vError("vgId:%d, failed to open vnode since no enough memory", vnode);
    return TAOS_SYSTEM_ERROR(errno);
  }

  atomic_add_fetch_32(&tsOpennedVnodes, 1);
  atomic_add_fetch_32(&pVnode->refCount, 1);

193
  pVnode->vgId     = vnode;
S
slguan 已提交
194
  pVnode->status   = TAOS_VN_STATUS_INIT;
195
  pVnode->version  = 0;  
S
slguan 已提交
196
  pVnode->tsdbCfg.tsdbId = pVnode->vgId;
H
hzcheng 已提交
197
  pVnode->rootDir = strdup(rootDir);
S
Shengliang Guan 已提交
198

199 200
  int32_t code = vnodeReadCfg(pVnode);
  if (code != TSDB_CODE_SUCCESS) {
201 202 203 204 205 206 207
    vnodeCleanUp(pVnode);
    return code;
  } 

  code = vnodeReadVersion(pVnode);
  if (code != TSDB_CODE_SUCCESS) {
    vnodeCleanUp(pVnode);
208 209 210
    return code;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211
  pVnode->fversion = pVnode->version;
S
slguan 已提交
212
  
S
Shengliang Guan 已提交
213 214
  pVnode->wqueue = dnodeAllocateVnodeWqueue(pVnode);
  pVnode->rqueue = dnodeAllocateVnodeRqueue(pVnode);
215 216 217 218
  if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
    vnodeCleanUp(pVnode);
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219

J
jtao1735 已提交
220
  SCqCfg cqCfg = {0};
B
Bomin Zhang 已提交
221
  sprintf(cqCfg.user, "_root");
J
jtao1735 已提交
222
  strcpy(cqCfg.pass, tsInternalPass);
B
Bomin Zhang 已提交
223
  strcpy(cqCfg.db, pVnode->db);
J
jtao1735 已提交
224
  cqCfg.vgId = vnode;
J
jtao1735 已提交
225 226
  cqCfg.cqWrite = vnodeWriteToQueue;
  pVnode->cq = cqOpen(pVnode, &cqCfg);
227 228 229 230
  if (pVnode->cq == NULL) {
    vnodeCleanUp(pVnode);
    return terrno;
  }
J
jtao1735 已提交
231

J
jtao1735 已提交
232 233
  STsdbAppH appH = {0};
  appH.appH = (void *)pVnode;
234
  appH.notifyStatus = vnodeProcessTsdbStatus;
J
jtao1735 已提交
235
  appH.cqH = pVnode->cq;
H
TD-354  
Hongze Cheng 已提交
236 237
  appH.cqCreateFunc = cqCreate;
  appH.cqDropFunc = cqDrop;
H
TD-90  
Hongze Cheng 已提交
238
  appH.configFunc = dnodeSendCfgTableToRecv;
J
jtao1735 已提交
239 240 241
  sprintf(temp, "%s/tsdb", rootDir);
  pVnode->tsdb = tsdbOpenRepo(temp, &appH);
  if (pVnode->tsdb == NULL) {
242
    vnodeCleanUp(pVnode);
J
jtao1735 已提交
243 244 245
    return terrno;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246
  sprintf(temp, "%s/wal", rootDir);
S
slguan 已提交
247
  pVnode->wal = walOpen(temp, &pVnode->walCfg);
248 249 250 251 252
  if (pVnode->wal == NULL) { 
    vnodeCleanUp(pVnode);
    return terrno;
  }

J
jtao1735 已提交
253 254
  walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);

J
Jeff Tao 已提交
255 256
  SSyncInfo syncInfo;
  syncInfo.vgId = pVnode->vgId;
J
Jeff Tao 已提交
257
  syncInfo.version = pVnode->version;
J
Jeff Tao 已提交
258
  syncInfo.syncCfg = pVnode->syncCfg;
259
  sprintf(syncInfo.path, "%s", rootDir);
J
Jeff Tao 已提交
260 261 262 263
  syncInfo.ahandle = pVnode;
  syncInfo.getWalInfo = vnodeGetWalInfo;
  syncInfo.getFileInfo = vnodeGetFileInfo;
  syncInfo.writeToCache = vnodeWriteToQueue;
S
Shengliang Guan 已提交
264
  syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp; 
J
Jeff Tao 已提交
265
  syncInfo.notifyRole = vnodeNotifyRole;
266
  syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
S
slguan 已提交
267
  pVnode->sync = syncStart(&syncInfo);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268 269 270

#ifndef _SYNC
  pVnode->role = TAOS_SYNC_ROLE_MASTER;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271
#else
272 273 274 275
  if (pVnode->sync == NULL) {
    vnodeCleanUp(pVnode);
    return terrno;
  }
S
Shengliang Guan 已提交
276 277
#endif

J
jtao1735 已提交
278
  // start continuous query
Y
yifan hao 已提交
279
  if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
J
jtao1735 已提交
280
    cqStart(pVnode->cq);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281

J
jtao1735 已提交
282
  pVnode->events = NULL;
S
slguan 已提交
283
  pVnode->status = TAOS_VN_STATUS_READY;
284
  vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285

286 287
  taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288 289 290
  return TSDB_CODE_SUCCESS;
}

B
Bomin Zhang 已提交
291 292 293 294 295 296 297 298 299
int32_t vnodeStartStream(int32_t vnode) {
  SVnodeObj* pVnode = vnodeAccquireVnode(vnode);
  if (pVnode != NULL) {
    tsdbStartStream(pVnode->tsdb);
    vnodeRelease(pVnode);
  }
  return TSDB_CODE_SUCCESS;
}

300
int32_t vnodeClose(int32_t vgId) {
S
Shengliang Guan 已提交
301
  SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
302
  if (ppVnode == NULL || *ppVnode == NULL) return 0;
303

304
  SVnodeObj *pVnode = *ppVnode;
305
  vTrace("vgId:%d, vnode will be closed", pVnode->vgId);
S
slguan 已提交
306
  pVnode->status = TAOS_VN_STATUS_CLOSING;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
307 308 309 310 311 312 313
  vnodeCleanUp(pVnode);

  return 0;
}

void vnodeRelease(void *pVnodeRaw) {
  SVnodeObj *pVnode = pVnodeRaw;
314
  int32_t    vgId = pVnode->vgId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315 316

  int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
317
  assert(refCount >= 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
318

J
Jeff Tao 已提交
319
  if (refCount > 0) {
guanshengliang's avatar
guanshengliang 已提交
320
    vTrace("vgId:%d, release vnode, refCount:%d", vgId, refCount);
J
Jeff Tao 已提交
321 322
    return;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
323

324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  if (pVnode->tsdb)
    tsdbCloseRepo(pVnode->tsdb, 1);
  pVnode->tsdb = NULL;

  if (pVnode->wal) 
    walClose(pVnode->wal);
  pVnode->wal = NULL;

  if (pVnode->wqueue) 
    dnodeFreeVnodeWqueue(pVnode->wqueue);
  pVnode->wqueue = NULL;

  if (pVnode->rqueue) 
    dnodeFreeVnodeRqueue(pVnode->rqueue);
  pVnode->rqueue = NULL;
 
H
hzcheng 已提交
340
  tfree(pVnode->rootDir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341

S
slguan 已提交
342
  if (pVnode->status == TAOS_VN_STATUS_DELETING) {
S
slguan 已提交
343 344 345
    char rootDir[TSDB_FILENAME_LEN] = {0};
    sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
    taosRemoveDir(rootDir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346 347
  }

348
  free(pVnode);
349

J
Jeff Tao 已提交
350
  int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
351
  vTrace("vgId:%d, vnode is released, vnodes:%d", vgId, count);
J
Jeff Tao 已提交
352

J
Jeff Tao 已提交
353
  if (count <= 0) {
S
Shengliang Guan 已提交
354
    taosHashCleanup(tsDnodeVnodesHash);
355
    vnodeModuleInit = PTHREAD_ONCE_INIT;
J
Jeff Tao 已提交
356
    tsDnodeVnodesHash = NULL;
357
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358 359 360
}

void *vnodeGetVnode(int32_t vgId) {
361 362
  if (tsDnodeVnodesHash == NULL) return NULL;

S
Shengliang Guan 已提交
363
  SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
364
  if (ppVnode == NULL || *ppVnode == NULL) {
365
    terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
guanshengliang's avatar
guanshengliang 已提交
366
    vPrint("vgId:%d, not exist", vgId);
S
slguan 已提交
367
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
368 369
  }

370
  return *ppVnode;
S
slguan 已提交
371 372 373 374 375 376
}

void *vnodeAccquireVnode(int32_t vgId) {
  SVnodeObj *pVnode = vnodeGetVnode(vgId);
  if (pVnode == NULL) return pVnode;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
377
  atomic_add_fetch_32(&pVnode->refCount, 1);
378
  vTrace("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379 380 381 382 383

  return pVnode;
}

void *vnodeGetRqueue(void *pVnode) {
Y
yifan hao 已提交
384
  return ((SVnodeObj *)pVnode)->rqueue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
385 386 387
}

void *vnodeGetWqueue(int32_t vgId) {
S
slguan 已提交
388
  SVnodeObj *pVnode = vnodeAccquireVnode(vgId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
389 390
  if (pVnode == NULL) return NULL;
  return pVnode->wqueue;
S
slguan 已提交
391
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
392 393

void *vnodeGetWal(void *pVnode) {
Y
yifan hao 已提交
394
  return ((SVnodeObj *)pVnode)->wal;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395 396
}

S
Shengliang Guan 已提交
397
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
S
slguan 已提交
398
  if (pVnode->status == TAOS_VN_STATUS_DELETING) return;
J
Jeff Tao 已提交
399
  if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
T
Tao Liu 已提交
400 401
  int64_t totalStorage, compStorage, pointsWritten = 0;
  tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
J
Jeff Tao 已提交
402 403 404

  SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
  pLoad->vgId = htonl(pVnode->vgId);
S
slguan 已提交
405
  pLoad->cfgVersion = htonl(pVnode->cfgVersion);
T
Tao Liu 已提交
406 407 408
  pLoad->totalStorage = htobe64(totalStorage);
  pLoad->compStorage = htobe64(compStorage);
  pLoad->pointsWritten = htobe64(pointsWritten);
J
Jeff Tao 已提交
409
  pLoad->status = pVnode->status;
S
slguan 已提交
410
  pLoad->role = pVnode->role;
T
Tao Liu 已提交
411
  pLoad->replica = pVnode->syncCfg.replica;  
J
Jeff Tao 已提交
412 413
}

S
Shengliang Guan 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
void vnodeBuildStatusMsg(void *param) {
  SDMStatusMsg *pStatus = param;
  SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);

  while (taosHashIterNext(pIter)) {
    SVnodeObj **pVnode = taosHashIterGet(pIter);
    if (pVnode == NULL) continue;
    if (*pVnode == NULL) continue;

    vnodeBuildVloadMsg(*pVnode, pStatus);
  }

  taosHashDestroyIter(pIter);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429
static void vnodeCleanUp(SVnodeObj *pVnode) {
430
  // remove from hash, so new messages wont be consumed
S
Shengliang Guan 已提交
431
  taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
432

433
  // stop replication module
S
slguan 已提交
434 435 436 437 438
  if (pVnode->sync) {
    syncStop(pVnode->sync);
    pVnode->sync = NULL;
  }

439
  // stop continuous query
440 441 442 443
  if (pVnode->cq) 
    cqClose(pVnode->cq);
  pVnode->cq = NULL;

444
  // release local resources only after cutting off outside connections
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445 446
  vnodeRelease(pVnode);
}
H
hzcheng 已提交
447

448
// TODO: this is a simple implement
449
static int vnodeProcessTsdbStatus(void *arg, int status) {
H
hzcheng 已提交
450
  SVnodeObj *pVnode = arg;
451

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452
  if (status == TSDB_STATUS_COMMIT_START) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
453
    pVnode->fversion = pVnode->version; 
454
    return walRenew(pVnode->wal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
455
  }
456 457 458 459 460

  if (status == TSDB_STATUS_COMMIT_OVER)
    return vnodeSaveVersion(pVnode);

  return 0; 
461 462
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) {
464
  SVnodeObj *pVnode = ahandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
465
  *fversion = pVnode->fversion;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
466
  return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
J
Jeff Tao 已提交
467 468 469 470 471 472 473 474 475
}

static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
  SVnodeObj *pVnode = ahandle;
  return walGetWalFile(pVnode->wal, name, index);
}

static void vnodeNotifyRole(void *ahandle, int8_t role) {
  SVnodeObj *pVnode = ahandle;
S
Shengliang Guan 已提交
476
  vPrint("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
J
Jeff Tao 已提交
477
  pVnode->role = role;
J
jtao1735 已提交
478

Y
yifan hao 已提交
479
  if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
J
jtao1735 已提交
480
    cqStart(pVnode->cq);
Y
yifan hao 已提交
481
  else
J
jtao1735 已提交
482
    cqStop(pVnode->cq);
J
Jeff Tao 已提交
483 484
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
485
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
486
  SVnodeObj *pVnode = ahandle;
487
  vTrace("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
488

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
489 490 491 492
  pVnode->fversion = fversion;
  pVnode->version = fversion;
  vnodeSaveVersion(pVnode);

H
hzcheng 已提交
493 494
  char rootDir[128] = "\0";
  sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
495
  // clsoe tsdb, then open tsdb
496
  tsdbCloseRepo(pVnode->tsdb, 0);
H
hzcheng 已提交
497 498
  STsdbAppH appH = {0};
  appH.appH = (void *)pVnode;
499
  appH.notifyStatus = vnodeProcessTsdbStatus;
H
hzcheng 已提交
500
  appH.cqH = pVnode->cq;
H
TD-354  
Hongze Cheng 已提交
501 502
  appH.cqCreateFunc = cqCreate;
  appH.cqDropFunc = cqDrop;
H
TD-90  
Hongze Cheng 已提交
503
  appH.configFunc = dnodeSendCfgTableToRecv;
H
hzcheng 已提交
504
  pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
505 506
}

507
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
S
slguan 已提交
508 509
  char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
510
  FILE *fp = fopen(cfgFile, "w");
S
slguan 已提交
511
  if (!fp) {
512
    vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
S
slguan 已提交
513
           strerror(errno));
514 515
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
S
slguan 已提交
516 517 518 519 520
  }

  int32_t len = 0;
  int32_t maxLen = 1000;
  char *  content = calloc(1, maxLen + 1);
521 522
  if (content == NULL) {
    fclose(fp);
523
    return TSDB_CODE_VND_OUT_OF_MEMORY;
524
  }
S
slguan 已提交
525 526

  len += snprintf(content + len, maxLen - len, "{\n");
527
  len += snprintf(content + len, maxLen - len, "  \"db\": \"%s\",\n", pVnodeCfg->db);
S
slguan 已提交
528
  len += snprintf(content + len, maxLen - len, "  \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
S
slguan 已提交
529 530
  len += snprintf(content + len, maxLen - len, "  \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
  len += snprintf(content + len, maxLen - len, "  \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
S
slguan 已提交
531 532
  len += snprintf(content + len, maxLen - len, "  \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
  len += snprintf(content + len, maxLen - len, "  \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
S
slguan 已提交
533 534 535
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
  len += snprintf(content + len, maxLen - len, "  \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
S
slguan 已提交
536 537
  len += snprintf(content + len, maxLen - len, "  \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
  len += snprintf(content + len, maxLen - len, "  \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
Y
yifan hao 已提交
538
  len += snprintf(content + len, maxLen - len, "  \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
S
slguan 已提交
539 540
  len += snprintf(content + len, maxLen - len, "  \"precision\": %d,\n", pVnodeCfg->cfg.precision);
  len += snprintf(content + len, maxLen - len, "  \"compression\": %d,\n", pVnodeCfg->cfg.compression);
H
hjxilinx 已提交
541
  len += snprintf(content + len, maxLen - len, "  \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
S
slguan 已提交
542
  len += snprintf(content + len, maxLen - len, "  \"replica\": %d,\n", pVnodeCfg->cfg.replications);
S
slguan 已提交
543
  len += snprintf(content + len, maxLen - len, "  \"wals\": %d,\n", pVnodeCfg->cfg.wals);
S
slguan 已提交
544
  len += snprintf(content + len, maxLen - len, "  \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
Y
yifan hao 已提交
545

S
slguan 已提交
546
  len += snprintf(content + len, maxLen - len, "  \"nodeInfos\": [{\n");
547
  for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
S
slguan 已提交
548
    len += snprintf(content + len, maxLen - len, "    \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
S
slguan 已提交
549
    len += snprintf(content + len, maxLen - len, "    \"nodeEp\": \"%s\"\n", pVnodeCfg->nodes[i].nodeEp);
S
slguan 已提交
550 551 552 553 554 555

    if (i < pVnodeCfg->cfg.replications - 1) {
      len += snprintf(content + len, maxLen - len, "  },{\n");
    } else {
      len += snprintf(content + len, maxLen - len, "  }]\n");
    }
556
  }
S
slguan 已提交
557
  len += snprintf(content + len, maxLen - len, "}\n");
558

S
slguan 已提交
559
  fwrite(content, 1, len, fp);
H
Hui Li 已提交
560
  fflush(fp);
561
  fclose(fp);
S
slguan 已提交
562
  free(content);
563

564
  vPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
S
slguan 已提交
565

566
  return TSDB_CODE_SUCCESS;
567 568 569
}

static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
570 571 572 573 574
  cJSON  *root = NULL;
  char   *content = NULL;
  char    cfgFile[TSDB_FILENAME_LEN + 30] = {0};
  int     maxLen = 1000;

575
  terrno = TSDB_CODE_VND_APP_ERROR;
S
slguan 已提交
576
  sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
577
  FILE *fp = fopen(cfgFile, "r");
S
slguan 已提交
578
  if (!fp) {
579
    vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId,
S
slguan 已提交
580
           cfgFile, strerror(errno));
581
    terrno = TAOS_SYSTEM_ERROR(errno);
582
    goto PARSE_OVER;
S
slguan 已提交
583 584
  }

585 586
  content = calloc(1, maxLen + 1);
  if (content == NULL) goto PARSE_OVER;
S
slguan 已提交
587 588
  int   len = fread(content, 1, maxLen, fp);
  if (len <= 0) {
589
    vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
S
Shuduo Sang 已提交
590
    free(content);
591
    return errno;
S
slguan 已提交
592 593
  }

594
  root = cJSON_Parse(content);
S
slguan 已提交
595
  if (root == NULL) {
596
    vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
S
slguan 已提交
597 598 599
    goto PARSE_OVER;
  }

600 601 602 603 604 605 606
  cJSON *db = cJSON_GetObjectItem(root, "db");
  if (!db || db->type != cJSON_String || db->valuestring == NULL) {
    vError("vgId:%d, failed to read vnode cfg, db not found", pVnode->vgId);
    goto PARSE_OVER;
  }
  strcpy(pVnode->db, db->valuestring);
  
S
slguan 已提交
607 608
  cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
  if (!cfgVersion || cfgVersion->type != cJSON_Number) {
609
    vError("vgId:%d, failed to read vnode cfg, cfgVersion not found", pVnode->vgId);
S
slguan 已提交
610 611 612 613
    goto PARSE_OVER;
  }
  pVnode->cfgVersion = cfgVersion->valueint;

S
slguan 已提交
614 615
  cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
  if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
616
    vError("vgId:%d, failed to read vnode cfg, cacheBlockSize not found", pVnode->vgId);
S
slguan 已提交
617 618
    goto PARSE_OVER;
  }
S
slguan 已提交
619
  pVnode->tsdbCfg.cacheBlockSize = cacheBlockSize->valueint;
S
slguan 已提交
620

S
slguan 已提交
621 622
  cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
  if (!totalBlocks || totalBlocks->type != cJSON_Number) {
623
    vError("vgId:%d, failed to read vnode cfg, totalBlocks not found", pVnode->vgId);
S
slguan 已提交
624 625
    goto PARSE_OVER;
  }
S
slguan 已提交
626
  pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
S
slguan 已提交
627 628 629

  cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
  if (!maxTables || maxTables->type != cJSON_Number) {
630
    vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
S
slguan 已提交
631 632 633 634
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.maxTables = maxTables->valueint;

S
slguan 已提交
635
   cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
S
slguan 已提交
636
  if (!daysPerFile || daysPerFile->type != cJSON_Number) {
637
    vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId);
S
slguan 已提交
638 639 640 641
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;

S
slguan 已提交
642 643
  cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
  if (!daysToKeep || daysToKeep->type != cJSON_Number) {
644
    vError("vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode->vgId);
S
slguan 已提交
645 646 647 648 649 650
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep = daysToKeep->valueint;

  cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
  if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
651
    vError("vgId:%d, failed to read vnode cfg, daysToKeep1 not found", pVnode->vgId);
S
slguan 已提交
652 653 654 655 656 657
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep1 = daysToKeep1->valueint;

  cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
  if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
658
    vError("vgId:%d, failed to read vnode cfg, daysToKeep2 not found", pVnode->vgId);
S
slguan 已提交
659 660 661 662
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.keep2 = daysToKeep2->valueint;

S
slguan 已提交
663 664
  cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
  if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
665
    vError("vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode->vgId);
S
slguan 已提交
666 667 668 669 670 671
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;

  cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
  if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
672
    vError("vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found", pVnode->vgId);
S
slguan 已提交
673 674 675 676
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;

S
slguan 已提交
677 678
  cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
  if (!commitTime || commitTime->type != cJSON_Number) {
679
    vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
S
slguan 已提交
680 681
    goto PARSE_OVER;
  }
S
slguan 已提交
682
  pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
S
slguan 已提交
683

S
slguan 已提交
684 685
  cJSON *precision = cJSON_GetObjectItem(root, "precision");
  if (!precision || precision->type != cJSON_Number) {
686
    vError("vgId:%d, failed to read vnode cfg, precision not found", pVnode->vgId);
S
slguan 已提交
687 688 689 690 691 692
    goto PARSE_OVER;
  }
  pVnode->tsdbCfg.precision = (int8_t)precision->valueint;

  cJSON *compression = cJSON_GetObjectItem(root, "compression");
  if (!compression || compression->type != cJSON_Number) {
693
    vError("vgId:%d, failed to read vnode cfg, compression not found", pVnode->vgId);
S
slguan 已提交
694 695
    goto PARSE_OVER;
  }
S
slguan 已提交
696
  pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
S
slguan 已提交
697

H
hjxilinx 已提交
698 699
  cJSON *walLevel = cJSON_GetObjectItem(root, "walLevel");
  if (!walLevel || walLevel->type != cJSON_Number) {
700
    vError("vgId:%d, failed to read vnode cfg, walLevel not found", pVnode->vgId);
S
slguan 已提交
701 702
    goto PARSE_OVER;
  }
H
hjxilinx 已提交
703
  pVnode->walCfg.walLevel = (int8_t) walLevel->valueint;
S
slguan 已提交
704 705 706

  cJSON *wals = cJSON_GetObjectItem(root, "wals");
  if (!wals || wals->type != cJSON_Number) {
707
    vError("vgId:%d, failed to read vnode cfg, wals not found", pVnode->vgId);
S
slguan 已提交
708 709 710
    goto PARSE_OVER;
  }
  pVnode->walCfg.wals = (int8_t)wals->valueint;
J
Jeff Tao 已提交
711
  pVnode->walCfg.keep = 0;
712

S
slguan 已提交
713 714
  cJSON *replica = cJSON_GetObjectItem(root, "replica");
  if (!replica || replica->type != cJSON_Number) {
715
    vError("vgId:%d, failed to read vnode cfg, replica not found", pVnode->vgId);
S
slguan 已提交
716
    goto PARSE_OVER;
717
  }
S
slguan 已提交
718
  pVnode->syncCfg.replica = (int8_t)replica->valueint;
719

S
slguan 已提交
720 721
  cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
  if (!quorum || quorum->type != cJSON_Number) {
722
    vError("failed to read vnode cfg, quorum not found", pVnode->vgId);
S
slguan 已提交
723 724 725
    goto PARSE_OVER;
  }
  pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
726

S
slguan 已提交
727 728
  cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
  if (!nodeInfos || nodeInfos->type != cJSON_Array) {
729
    vError("vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode->vgId);
S
slguan 已提交
730 731 732 733 734
    goto PARSE_OVER;
  }

  int size = cJSON_GetArraySize(nodeInfos);
  if (size != pVnode->syncCfg.replica) {
735
    vError("vgId:%d, failed to read vnode cfg, nodeInfos size not matched", pVnode->vgId);
S
slguan 已提交
736 737 738 739 740 741 742 743 744
    goto PARSE_OVER;
  }

  for (int i = 0; i < size; ++i) {
    cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
    if (nodeInfo == NULL) continue;

    cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
    if (!nodeId || nodeId->type != cJSON_Number) {
745
      vError("vgId:%d, failed to read vnode cfg, nodeId not found", pVnode->vgId);
S
slguan 已提交
746 747 748 749
      goto PARSE_OVER;
    }
    pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;

J
jtao1735 已提交
750 751
    cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
    if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
752
      vError("vgId:%d, failed to read vnode cfg, nodeFqdn not found", pVnode->vgId);
S
slguan 已提交
753 754 755
      goto PARSE_OVER;
    }

J
jtao1735 已提交
756 757
    taosGetFqdnPortFromEp(nodeEp->valuestring, pVnode->syncCfg.nodeInfo[i].nodeFqdn, &pVnode->syncCfg.nodeInfo[i].nodePort);
    pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
S
slguan 已提交
758 759
  }

760
  terrno = TSDB_CODE_SUCCESS;
S
slguan 已提交
761

762
  vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
S
slguan 已提交
763
  for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
764
    vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
J
jtao1735 已提交
765
           pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
S
slguan 已提交
766 767 768
  }

PARSE_OVER:
769
  tfree(content);
S
slguan 已提交
770
  cJSON_Delete(root);
771
  if (fp) fclose(fp);
772
  return terrno;
S
slguan 已提交
773
}
S
slguan 已提交
774 775 776 777 778 779

static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
  char versionFile[TSDB_FILENAME_LEN + 30] = {0};
  sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
  FILE *fp = fopen(versionFile, "w");
  if (!fp) {
780
    vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
S
slguan 已提交
781
           versionFile, strerror(errno));
782
    return TAOS_SYSTEM_ERROR(errno); 
S
slguan 已提交
783 784 785 786
  }

  int32_t len = 0;
  int32_t maxLen = 30;
787
  char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0};
S
slguan 已提交
788 789

  len += snprintf(content + len, maxLen - len, "{\n");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
790
  len += snprintf(content + len, maxLen - len, "  \"version\": %" PRId64 "\n", pVnode->fversion);
S
slguan 已提交
791 792 793 794 795
  len += snprintf(content + len, maxLen - len, "}\n");

  fwrite(content, 1, len, fp);
  fclose(fp);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
796
  vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
S
slguan 已提交
797

798
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
799 800
}

801 802 803 804 805 806
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
  char    versionFile[TSDB_FILENAME_LEN + 30] = {0};
  char   *content = NULL;
  cJSON  *root = NULL;
  int     maxLen = 100;

807
  terrno = TSDB_CODE_VND_APP_ERROR;
S
slguan 已提交
808
  sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
S
slguan 已提交
809
  FILE *fp = fopen(versionFile, "r");
S
slguan 已提交
810
  if (!fp) {
guanshengliang's avatar
guanshengliang 已提交
811 812
    if (errno != ENOENT) {
      vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
813
      terrno = TAOS_SYSTEM_ERROR(errno);
814
    } else {
815
      terrno = TSDB_CODE_SUCCESS;
guanshengliang's avatar
guanshengliang 已提交
816
    }
817
    goto PARSE_OVER;
S
slguan 已提交
818 819
  }

820
  content = calloc(1, maxLen + 1);
S
slguan 已提交
821 822
  int   len = fread(content, 1, maxLen, fp);
  if (len <= 0) {
823 824
    vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
    goto PARSE_OVER;
S
slguan 已提交
825 826
  }

827
  root = cJSON_Parse(content);
S
slguan 已提交
828
  if (root == NULL) {
829
    vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
S
slguan 已提交
830 831 832 833 834
    goto PARSE_OVER;
  }

  cJSON *version = cJSON_GetObjectItem(root, "version");
  if (!version || version->type != cJSON_Number) {
835
    vError("vgId:%d, failed to read vnode version, version not found", pVnode->vgId);
S
slguan 已提交
836 837 838 839
    goto PARSE_OVER;
  }
  pVnode->version = version->valueint;

840
  terrno = TSDB_CODE_SUCCESS;
841
  vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version);
S
slguan 已提交
842 843

PARSE_OVER:
844
  tfree(content);
S
slguan 已提交
845
  cJSON_Delete(root);
846
  if(fp) fclose(fp);
847
  return terrno;
J
jtao1735 已提交
848
}