vnodeCommit.c 15.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17
#include "vnodeInt.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19 20
#define VND_INFO_FNAME_TMP "vnode_tmp.json"

H
Hongze Cheng 已提交
21 22
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
H
refact  
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25
#define WAIT_TIME_MILI_SEC 10  // miliseconds

H
Hongze Cheng 已提交
26 27
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
  int32_t code = 0;
H
Hongze Cheng 已提交
28

H
Hongze Cheng 已提交
29 30
  if (pVnode->onRecycle == NULL) {
    if (pVnode->recycleHead == NULL) {
31
      vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
H
Hongze Cheng 已提交
32
      goto _exit;
H
Hongze Cheng 已提交
33
    } else {
34
      vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43 44
             pVnode->recycleHead->id);

      pVnode->onRecycle = pVnode->recycleHead;
      if (pVnode->recycleHead == pVnode->recycleTail) {
        pVnode->recycleHead = pVnode->recycleTail = NULL;
      } else {
        pVnode->recycleHead = pVnode->recycleHead->recycleNext;
        pVnode->recycleHead->recyclePrev = NULL;
      }
      pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
H
Hongze Cheng 已提交
45
    }
H
Hongze Cheng 已提交
46
  }
H
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48 49
  code = vnodeBufPoolRecycle(pVnode->onRecycle);
  if (code) goto _exit;
H
Hongze Cheng 已提交
50

H
Hongze Cheng 已提交
51
_exit:
H
Hongze Cheng 已提交
52
  if (code) {
53
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
54
  }
H
Hongze Cheng 已提交
55 56
  return code;
}
H
Hongze Cheng 已提交
57 58
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
  int32_t code = 0;
H
Hongze Cheng 已提交
59
  int32_t lino = 0;
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61
  taosThreadMutexLock(&pVnode->mutex);
H
Hongze Cheng 已提交
62

H
Hongze Cheng 已提交
63
  int32_t nTry = 0;
H
Hongze Cheng 已提交
64 65 66
  for (;;) {
    ++nTry;

H
Hongze Cheng 已提交
67
    if (pVnode->freeList) {
68
      vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
H
Hongze Cheng 已提交
69
             pVnode->freeList->id);
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72 73 74 75 76
      pVnode->inUse = pVnode->freeList;
      pVnode->inUse->nRef = 1;
      pVnode->freeList = pVnode->inUse->freeNext;
      pVnode->inUse->freeNext = NULL;
      break;
    } else {
77
      vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
78

H
Hongze Cheng 已提交
79 80
      code = vnodeTryRecycleBufPool(pVnode);
      TSDB_CHECK_CODE(code, lino, _exit);
81

H
Hongze Cheng 已提交
82
      if (pVnode->freeList == NULL) {
83
        vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
H
Hongze Cheng 已提交
84 85 86 87 88

        struct timeval  tv;
        struct timespec ts;
        taosGetTimeOfDay(&tv);
        ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
H
Hongze Cheng 已提交
89 90 91 92 93 94
        if (ts.tv_nsec > 999999999l) {
          ts.tv_sec = tv.tv_sec + 1;
          ts.tv_nsec -= 1000000000l;
        } else {
          ts.tv_sec = tv.tv_sec;
        }
H
Hongze Cheng 已提交
95 96 97

        int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
        if (rc && rc != ETIMEDOUT) {
H
Hongze Cheng 已提交
98 99
          code = TAOS_SYSTEM_ERROR(rc);
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
100 101
        }
      }
H
Hongze Cheng 已提交
102 103
    }
  }
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105
_exit:
H
Hongze Cheng 已提交
106
  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
107
  if (code) {
108
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
109
  }
H
Hongze Cheng 已提交
110 111 112 113 114
  return code;
}
int vnodeBegin(SVnode *pVnode) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
115

H
Hongze Cheng 已提交
116 117 118 119
  // alloc buffer pool
  code = vnodeGetBufPoolToUse(pVnode);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
120
  // begin meta
121
  if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
H
Hongze Cheng 已提交
122 123
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
124 125 126
  }

  // begin tsdb
H
Hongze Cheng 已提交
127
  if (tsdbBegin(pVnode->pTsdb) < 0) {
H
Hongze Cheng 已提交
128 129
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
130
  }
C
Cary Xu 已提交
131

C
Cary Xu 已提交
132
  // begin sma
C
Cary Xu 已提交
133
  if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
H
Hongze Cheng 已提交
134 135
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
136
  }
C
Cary Xu 已提交
137

H
Hongze Cheng 已提交
138 139
_exit:
  if (code) {
H
Hongze Cheng 已提交
140
    terrno = code;
141
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
142 143
  }
  return code;
H
Hongze Cheng 已提交
144 145
}

146 147 148
void vnodeUpdCommitSched(SVnode *pVnode) {
  int64_t randNum = taosRand();
  pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
149
  pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
H
Hongze Cheng 已提交
150 151
}

152
int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
153
  SVCommitSched *pSched = &pVnode->commitSched;
H
Hongze Cheng 已提交
154
  int64_t        nowMs = taosGetMonoTimestampMs();
155 156
  bool           diskAvail = osDataSpaceAvailable();
  bool           needCommit = false;
B
Benguang Zhao 已提交
157

158
  taosThreadMutexLock(&pVnode->mutex);
B
Benguang Zhao 已提交
159 160
  if (pVnode->inUse && diskAvail) {
    needCommit =
161 162
        ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
        ((pVnode->inUse->size > 0) && atExit);
C
Cary Xu 已提交
163
  }
164 165
  taosThreadMutexUnlock(&pVnode->mutex);
  return needCommit;
C
Cary Xu 已提交
166
}
H
Hongze Cheng 已提交
167

H
Hongze Cheng 已提交
168 169 170
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile;
H
Hongze Cheng 已提交
171
  char     *data;
H
Hongze Cheng 已提交
172 173 174 175 176 177

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  // encode info
  data = NULL;

H
Hongze Cheng 已提交
178
  if (vnodeEncodeInfo(pInfo, &data) < 0) {
179
    vError("failed to encode json info.");
H
Hongze Cheng 已提交
180 181 182 183 184 185
    return -1;
  }

  // save info to a vnode_tmp.json
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
186
    vError("failed to open info file:%s for write:%s", fname, terrstr());
H
Hongze Cheng 已提交
187 188 189 190
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
191
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
192
    vError("failed to write info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
193 194 195 196 197
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFile) < 0) {
S
Shengliang Guan 已提交
198
    vError("failed to fsync info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
199 200 201 202 203 204 205 206 207
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFile);

  // free info binary
  taosMemoryFree(data);

208 209
  vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
        pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
H
Hongze Cheng 已提交
210 211 212 213 214 215 216 217 218

  return 0;

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(data);
  return -1;
}

H
Hongze Cheng 已提交
219
int vnodeCommitInfo(const char *dir) {
H
Hongze Cheng 已提交
220 221 222 223 224 225 226 227 228 229 230
  char fname[TSDB_FILENAME_LEN];
  char tfname[TSDB_FILENAME_LEN];

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  if (taosRenameFile(tfname, fname) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
231
  vInfo("vnode info is committed, dir:%s", dir);
H
Hongze Cheng 已提交
232 233 234
  return 0;
}

H
Hongze Cheng 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile = NULL;
  char     *pData = NULL;
  int64_t   size;

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);

  // read info
  pFile = taosOpenFile(fname, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
255
  pData = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
256 257 258 259 260 261 262 263 264 265
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (taosReadFile(pFile, pData, size) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
266 267
  pData[size] = '\0';

H
Hongze Cheng 已提交
268 269 270 271 272 273 274 275 276 277
  taosCloseFile(&pFile);

  // decode info
  if (vnodeDecodeInfo(pData, pInfo) < 0) {
    taosMemoryFree(pData);
    return -1;
  }

  taosMemoryFree(pData);

H
Hongze Cheng 已提交
278
  return 0;
H
Hongze Cheng 已提交
279 280 281 282 283

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(pData);
  return -1;
H
Hongze Cheng 已提交
284 285
}

H
Hongze Cheng 已提交
286
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
287
  int32_t code = 0;
288 289 290
  int32_t lino = 0;
  char    dir[TSDB_FILENAME_LEN] = {0};

H
Hongze Cheng 已提交
291 292
  tsem_wait(&pVnode->canCommit);

293 294 295 296 297
  pVnode->state.commitTerm = pVnode->state.applyTerm;

  pInfo->info.config = pVnode->config;
  pInfo->info.state.committed = pVnode->state.applied;
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
H
Hongze Cheng 已提交
298
  pInfo->info.state.commitID = ++pVnode->state.commitID;
299 300 301 302 303 304 305 306 307
  pInfo->pVnode = pVnode;
  pInfo->txn = metaGetTxn(pVnode->pMeta);

  // save info
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
308 309

  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
310 311 312 313 314
  if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

315
  tsdbPrepareCommit(pVnode->pTsdb);
K
kailixu 已提交
316

317
  metaPrepareAsyncCommit(pVnode->pMeta);
318

319 320
  code = smaPrepareAsyncCommit(pVnode->pSma);
  if (code) goto _exit;
K
kailixu 已提交
321

K
kailixu 已提交
322 323 324 325 326 327
  taosThreadMutexLock(&pVnode->mutex);
  ASSERT(pVnode->onCommit == NULL);
  pVnode->onCommit = pVnode->inUse;
  pVnode->inUse = NULL;
  taosThreadMutexUnlock(&pVnode->mutex);

328 329 330 331 332
_exit:
  if (code) {
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
           tstrerror(code), pVnode->state.commitID);
  } else {
H
Hongze Cheng 已提交
333
    vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
334
  }
335

336
  return code;
H
Hongze Cheng 已提交
337
}
H
Hongze Cheng 已提交
338
static void vnodeReturnBufPool(SVnode *pVnode) {
H
Hongze Cheng 已提交
339
  taosThreadMutexLock(&pVnode->mutex);
340

H
Hongze Cheng 已提交
341 342 343
  SVBufPool *pPool = pVnode->onCommit;
  int32_t    nRef = atomic_sub_fetch_32(&pPool->nRef, 1);

H
Hongze Cheng 已提交
344 345
  pVnode->onCommit = NULL;
  if (nRef == 0) {
H
Hongze Cheng 已提交
346
    vnodeBufPoolAddToFreeList(pPool);
H
Hongze Cheng 已提交
347
  } else if (nRef > 0) {
348
    vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
H
Hongze Cheng 已提交
349 350 351 352 353 354 355 356 357 358

    if (pVnode->recycleTail == NULL) {
      pPool->recyclePrev = pPool->recycleNext = NULL;
      pVnode->recycleHead = pVnode->recycleTail = pPool;
    } else {
      pPool->recyclePrev = pVnode->recycleTail;
      pPool->recycleNext = NULL;
      pVnode->recycleTail->recycleNext = pPool;
      pVnode->recycleTail = pPool;
    }
H
Hongze Cheng 已提交
359 360
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
361 362 363
  }

  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
364
}
H
Hongze Cheng 已提交
365 366 367
static int32_t vnodeCommitTask(void *arg) {
  int32_t code = 0;

H
Hongze Cheng 已提交
368
  SCommitInfo *pInfo = (SCommitInfo *)arg;
H
Hongze Cheng 已提交
369
  SVnode      *pVnode = pInfo->pVnode;
H
Hongze Cheng 已提交
370

H
Hongze Cheng 已提交
371 372
  // commit
  code = vnodeCommitImpl(pInfo);
H
Hongze Cheng 已提交
373 374
  if (code) goto _exit;

H
Hongze Cheng 已提交
375
  vnodeReturnBufPool(pVnode);
H
Hongze Cheng 已提交
376

377
_exit:
H
Hongze Cheng 已提交
378
  // end commit
H
Hongze Cheng 已提交
379
  tsem_post(&pVnode->canCommit);
H
Hongze Cheng 已提交
380
  taosMemoryFree(pInfo);
H
Hongze Cheng 已提交
381 382
  return code;
}
B
Benguang Zhao 已提交
383

H
refact  
Hongze Cheng 已提交
384
int vnodeAsyncCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
385
  int32_t code = 0;
H
Hongze Cheng 已提交
386

H
Hongze Cheng 已提交
387
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
H
Hongze Cheng 已提交
388 389 390 391
  if (NULL == pInfo) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
392 393 394 395 396 397 398 399

  // prepare to commit
  code = vnodePrepareCommit(pVnode, pInfo);
  if (TSDB_CODE_SUCCESS != code) {
    goto _exit;
  }

  // schedule the task
400
  code = vnodeScheduleTask(vnodeCommitTask, pInfo);
H
Hongze Cheng 已提交
401

H
Hongze Cheng 已提交
402 403
_exit:
  if (code) {
404 405 406
    if (NULL != pInfo) {
      taosMemoryFree(pInfo);
    }
407 408
    tsem_post(&pVnode->canCommit);
    vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
H
Hongze Cheng 已提交
409 410
           pVnode->state.commitID);
  } else {
411 412
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
          pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
H
Hongze Cheng 已提交
413 414
  }
  return code;
H
Hongze Cheng 已提交
415
}
H
refact  
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417 418
int vnodeSyncCommit(SVnode *pVnode) {
  vnodeAsyncCommit(pVnode);
H
Hongze Cheng 已提交
419 420
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
H
Hongze Cheng 已提交
421 422 423
  return 0;
}

H
Hongze Cheng 已提交
424 425 426 427 428 429
static int vnodeCommitImpl(SCommitInfo *pInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  char    dir[TSDB_FILENAME_LEN] = {0};
  SVnode *pVnode = pInfo->pVnode;
H
Hongze Cheng 已提交
430

431
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
B
Benguang Zhao 已提交
432 433
        pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);

434
  vnodeUpdCommitSched(pVnode);
H
Hongze Cheng 已提交
435

436 437 438 439 440 441
  // persist wal before starting
  if (walPersist(pVnode->pWal) < 0) {
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

H
Hongze Cheng 已提交
442 443 444 445 446
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
447

448
  syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
H
Hongze Cheng 已提交
449 450

  // commit each sub-system
H
Hongze Cheng 已提交
451
  code = tsdbCommit(pVnode->pTsdb, pInfo);
C
Cary Xu 已提交
452 453
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
454
  if (VND_IS_RSMA(pVnode)) {
H
Hongze Cheng 已提交
455
    code = smaCommit(pVnode->pSma, pInfo);
H
Hongze Cheng 已提交
456
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
457
  }
C
Cary Xu 已提交
458

H
Hongze Cheng 已提交
459
  if (tqCommit(pVnode->pTq) < 0) {
H
Hongze Cheng 已提交
460 461
    code = TSDB_CODE_FAILED;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
462 463 464
  }

  // commit info
H
Hongze Cheng 已提交
465
  if (vnodeCommitInfo(dir) < 0) {
H
Hongze Cheng 已提交
466 467
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
468
  }
H
Hongze Cheng 已提交
469

C
Cary Xu 已提交
470 471 472
  code = tsdbFinishCommit(pVnode->pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
473 474 475 476
  if (VND_IS_RSMA(pVnode)) {
    code = smaFinishCommit(pVnode->pSma);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
477

478
  if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) {
479 480 481 482
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
483
  pVnode->state.committed = pInfo->info.state.committed;
H
Hongze Cheng 已提交
484

C
Cary Xu 已提交
485 486
  if (smaPostCommit(pVnode->pSma) < 0) {
    vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
487 488
    return -1;
  }
H
Hongze Cheng 已提交
489

490
  syncEndSnapshot(pVnode->sync);
H
Hongze Cheng 已提交
491

H
Hongze Cheng 已提交
492 493
_exit:
  if (code) {
S
Shengliang Guan 已提交
494
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
495 496 497
  } else {
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
  }
H
Hongze Cheng 已提交
498 499 500
  return 0;
}

H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
bool vnodeShouldRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  return taosCheckExistFile(tFName);
}

void vnodeRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  (void)taosRemoveFile(tFName);
}

H
Hongze Cheng 已提交
517 518 519 520
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
  const SVState *pState = (SVState *)pObj;

  if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
H
Hongze Cheng 已提交
521
  if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
H
Hongze Cheng 已提交
522
  if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
H
Hongze Cheng 已提交
523 524 525 526 527 528 529

  return 0;
}

static int vnodeDecodeState(const SJson *pJson, void *pObj) {
  SVState *pState = (SVState *)pObj;

530 531
  int32_t code;
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
H
Hongze Cheng 已提交
532
  if (code < 0) return -1;
H
Hongze Cheng 已提交
533 534
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
535 536
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567

  return 0;
}

static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
  SJson *pJson;
  char  *pData;

  *ppData = NULL;

  pJson = tjsonCreateObject();
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  pData = tjsonToString(pJson);
  if (pData == NULL) {
    goto _err;
  }

  tjsonDelete(pJson);

  *ppData = pData;
H
Hongze Cheng 已提交
568
  return 0;
H
Hongze Cheng 已提交
569 570 571 572

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
573 574
}

H
Hongze Cheng 已提交
575
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
H
Hongze Cheng 已提交
576 577
  SJson *pJson = NULL;

H
fix bug  
Hongze Cheng 已提交
578
  pJson = tjsonParse(pData);
H
Hongze Cheng 已提交
579 580 581 582 583 584 585 586 587 588 589 590 591 592
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  tjsonDelete(pJson);

H
Hongze Cheng 已提交
593
  return 0;
H
Hongze Cheng 已提交
594 595 596 597

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
598
}