vnodeCommit.c 15.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17
#include "vnodeInt.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19 20
#define VND_INFO_FNAME_TMP "vnode_tmp.json"

H
Hongze Cheng 已提交
21 22
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
H
refact  
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25
#define WAIT_TIME_MILI_SEC 10  // miliseconds

H
Hongze Cheng 已提交
26 27
static int32_t vnodeTryRecycleBufPool(SVnode *pVnode) {
  int32_t code = 0;
H
Hongze Cheng 已提交
28

H
Hongze Cheng 已提交
29 30
  if (pVnode->onRecycle == NULL) {
    if (pVnode->recycleHead == NULL) {
31
      vDebug("vgId:%d, no recyclable buffer pool", TD_VID(pVnode));
H
Hongze Cheng 已提交
32
      goto _exit;
H
Hongze Cheng 已提交
33
    } else {
34
      vDebug("vgId:%d, buffer pool %p of id %d on recycle queue, try to recycle", TD_VID(pVnode), pVnode->recycleHead,
H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43 44
             pVnode->recycleHead->id);

      pVnode->onRecycle = pVnode->recycleHead;
      if (pVnode->recycleHead == pVnode->recycleTail) {
        pVnode->recycleHead = pVnode->recycleTail = NULL;
      } else {
        pVnode->recycleHead = pVnode->recycleHead->recycleNext;
        pVnode->recycleHead->recyclePrev = NULL;
      }
      pVnode->onRecycle->recycleNext = pVnode->onRecycle->recyclePrev = NULL;
H
Hongze Cheng 已提交
45
    }
H
Hongze Cheng 已提交
46
  }
H
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48 49
  code = vnodeBufPoolRecycle(pVnode->onRecycle);
  if (code) goto _exit;
H
Hongze Cheng 已提交
50

H
Hongze Cheng 已提交
51
_exit:
H
Hongze Cheng 已提交
52
  if (code) {
53
    vError("vgId:%d, %s failed since %s", TD_VID(pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
54
  }
H
Hongze Cheng 已提交
55 56
  return code;
}
H
Hongze Cheng 已提交
57 58
static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
  int32_t code = 0;
H
Hongze Cheng 已提交
59
  int32_t lino = 0;
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61
  taosThreadMutexLock(&pVnode->mutex);
H
Hongze Cheng 已提交
62

H
Hongze Cheng 已提交
63
  int32_t nTry = 0;
H
Hongze Cheng 已提交
64 65 66
  for (;;) {
    ++nTry;

H
Hongze Cheng 已提交
67
    if (pVnode->freeList) {
68
      vDebug("vgId:%d, allocate free buffer pool on %d try, pPool:%p id:%d", TD_VID(pVnode), nTry, pVnode->freeList,
H
Hongze Cheng 已提交
69
             pVnode->freeList->id);
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71 72 73 74 75 76
      pVnode->inUse = pVnode->freeList;
      pVnode->inUse->nRef = 1;
      pVnode->freeList = pVnode->inUse->freeNext;
      pVnode->inUse->freeNext = NULL;
      break;
    } else {
77
      vDebug("vgId:%d, no free buffer pool on %d try, try to recycle...", TD_VID(pVnode), nTry);
78

H
Hongze Cheng 已提交
79 80
      code = vnodeTryRecycleBufPool(pVnode);
      TSDB_CHECK_CODE(code, lino, _exit);
81

H
Hongze Cheng 已提交
82
      if (pVnode->freeList == NULL) {
83
        vDebug("vgId:%d, no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), nTry, WAIT_TIME_MILI_SEC);
H
Hongze Cheng 已提交
84 85 86 87 88

        struct timeval  tv;
        struct timespec ts;
        taosGetTimeOfDay(&tv);
        ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
H
Hongze Cheng 已提交
89 90 91 92 93 94
        if (ts.tv_nsec > 999999999l) {
          ts.tv_sec = tv.tv_sec + 1;
          ts.tv_nsec -= 1000000000l;
        } else {
          ts.tv_sec = tv.tv_sec;
        }
H
Hongze Cheng 已提交
95 96 97

        int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
        if (rc && rc != ETIMEDOUT) {
H
Hongze Cheng 已提交
98 99
          code = TAOS_SYSTEM_ERROR(rc);
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
100 101
        }
      }
H
Hongze Cheng 已提交
102 103
    }
  }
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105
_exit:
H
Hongze Cheng 已提交
106
  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
107
  if (code) {
108
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
109
  }
H
Hongze Cheng 已提交
110 111 112 113 114
  return code;
}
int vnodeBegin(SVnode *pVnode) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
115

H
Hongze Cheng 已提交
116 117 118 119
  // alloc buffer pool
  code = vnodeGetBufPoolToUse(pVnode);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
120
  // begin meta
121
  if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
H
Hongze Cheng 已提交
122 123
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
124 125 126
  }

  // begin tsdb
H
Hongze Cheng 已提交
127
  if (tsdbBegin(pVnode->pTsdb) < 0) {
H
Hongze Cheng 已提交
128 129
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
130
  }
C
Cary Xu 已提交
131

132
  // begin sma
133
  if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
H
Hongze Cheng 已提交
134 135
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
136
  }
137

H
Hongze Cheng 已提交
138 139
_exit:
  if (code) {
H
Hongze Cheng 已提交
140
    terrno = code;
141
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
142 143
  }
  return code;
H
Hongze Cheng 已提交
144 145
}

146 147 148
void vnodeUpdCommitSched(SVnode *pVnode) {
  int64_t randNum = taosRand();
  pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
149
  pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
H
Hongze Cheng 已提交
150 151
}

152
int vnodeShouldCommit(SVnode *pVnode) {
153 154 155 156
  if (!pVnode->inUse || !osDataSpaceAvailable()) {
    return false;
  }

157
  SVCommitSched *pSched = &pVnode->commitSched;
H
Hongze Cheng 已提交
158
  int64_t        nowMs = taosGetMonoTimestampMs();
159

160 161
  return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
          (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
162 163 164
}

int vnodeShouldCommitOld(SVnode *pVnode) {
165
  if (pVnode->inUse) {
166
    return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
167 168 169
  }
  return false;
}
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171 172 173
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile;
H
Hongze Cheng 已提交
174
  char     *data;
H
Hongze Cheng 已提交
175 176 177 178 179 180

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  // encode info
  data = NULL;

H
Hongze Cheng 已提交
181
  if (vnodeEncodeInfo(pInfo, &data) < 0) {
182
    vError("failed to encode json info.");
H
Hongze Cheng 已提交
183 184 185 186 187 188
    return -1;
  }

  // save info to a vnode_tmp.json
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
189
    vError("failed to open info file:%s for write:%s", fname, terrstr());
H
Hongze Cheng 已提交
190 191 192 193
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
194
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
195
    vError("failed to write info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
196 197 198 199 200
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFile) < 0) {
201
    vError("failed to fsync info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
202 203 204 205 206 207 208 209 210
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFile);

  // free info binary
  taosMemoryFree(data);

211 212
  vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
        pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
H
Hongze Cheng 已提交
213 214 215 216 217 218 219 220 221

  return 0;

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(data);
  return -1;
}

H
Hongze Cheng 已提交
222
int vnodeCommitInfo(const char *dir) {
H
Hongze Cheng 已提交
223 224 225 226 227 228 229 230 231 232 233
  char fname[TSDB_FILENAME_LEN];
  char tfname[TSDB_FILENAME_LEN];

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  if (taosRenameFile(tfname, fname) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
234
  vInfo("vnode info is committed, dir:%s", dir);
H
Hongze Cheng 已提交
235 236 237
  return 0;
}

H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile = NULL;
  char     *pData = NULL;
  int64_t   size;

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);

  // read info
  pFile = taosOpenFile(fname, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
258
  pData = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
259 260 261 262 263 264 265 266 267 268
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (taosReadFile(pFile, pData, size) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
269 270
  pData[size] = '\0';

H
Hongze Cheng 已提交
271 272 273 274 275 276 277 278 279 280
  taosCloseFile(&pFile);

  // decode info
  if (vnodeDecodeInfo(pData, pInfo) < 0) {
    taosMemoryFree(pData);
    return -1;
  }

  taosMemoryFree(pData);

H
Hongze Cheng 已提交
281
  return 0;
H
Hongze Cheng 已提交
282 283 284 285 286

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(pData);
  return -1;
H
Hongze Cheng 已提交
287 288
}

H
Hongze Cheng 已提交
289
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
290
  int32_t code = 0;
291 292 293
  int32_t lino = 0;
  char    dir[TSDB_FILENAME_LEN] = {0};

H
Hongze Cheng 已提交
294 295
  tsem_wait(&pVnode->canCommit);

296 297 298 299 300
  pVnode->state.commitTerm = pVnode->state.applyTerm;

  pInfo->info.config = pVnode->config;
  pInfo->info.state.committed = pVnode->state.applied;
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
H
Hongze Cheng 已提交
301
  pInfo->info.state.commitID = ++pVnode->state.commitID;
302 303 304 305 306 307 308 309 310
  pInfo->pVnode = pVnode;
  pInfo->txn = metaGetTxn(pVnode->pMeta);

  // save info
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
311 312

  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
313 314 315 316 317
  if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

318
  tsdbPrepareCommit(pVnode->pTsdb);
K
kailixu 已提交
319

320
  metaPrepareAsyncCommit(pVnode->pMeta);
321

322 323
  code = smaPrepareAsyncCommit(pVnode->pSma);
  if (code) goto _exit;
K
kailixu 已提交
324

325 326 327 328 329 330
  taosThreadMutexLock(&pVnode->mutex);
  ASSERT(pVnode->onCommit == NULL);
  pVnode->onCommit = pVnode->inUse;
  pVnode->inUse = NULL;
  taosThreadMutexUnlock(&pVnode->mutex);

331 332 333 334 335
_exit:
  if (code) {
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
           tstrerror(code), pVnode->state.commitID);
  } else {
H
Hongze Cheng 已提交
336
    vDebug("vgId:%d, %s done, commit id:%" PRId64, TD_VID(pVnode), __func__, pInfo->info.state.commitID);
337
  }
338

339
  return code;
H
Hongze Cheng 已提交
340
}
H
Hongze Cheng 已提交
341
static void vnodeReturnBufPool(SVnode *pVnode) {
H
Hongze Cheng 已提交
342
  taosThreadMutexLock(&pVnode->mutex);
343

H
Hongze Cheng 已提交
344 345 346
  SVBufPool *pPool = pVnode->onCommit;
  int32_t    nRef = atomic_sub_fetch_32(&pPool->nRef, 1);

H
Hongze Cheng 已提交
347 348
  pVnode->onCommit = NULL;
  if (nRef == 0) {
H
Hongze Cheng 已提交
349
    vnodeBufPoolAddToFreeList(pPool);
H
Hongze Cheng 已提交
350
  } else if (nRef > 0) {
351
    vDebug("vgId:%d, buffer pool %p of id %d is added to recycle queue", TD_VID(pVnode), pPool, pPool->id);
H
Hongze Cheng 已提交
352 353 354 355 356 357 358 359 360 361

    if (pVnode->recycleTail == NULL) {
      pPool->recyclePrev = pPool->recycleNext = NULL;
      pVnode->recycleHead = pVnode->recycleTail = pPool;
    } else {
      pPool->recyclePrev = pVnode->recycleTail;
      pPool->recycleNext = NULL;
      pVnode->recycleTail->recycleNext = pPool;
      pVnode->recycleTail = pPool;
    }
H
Hongze Cheng 已提交
362 363
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
364 365 366
  }

  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
367
}
H
Hongze Cheng 已提交
368 369 370
static int32_t vnodeCommitTask(void *arg) {
  int32_t code = 0;

H
Hongze Cheng 已提交
371
  SCommitInfo *pInfo = (SCommitInfo *)arg;
H
Hongze Cheng 已提交
372
  SVnode      *pVnode = pInfo->pVnode;
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374 375
  // commit
  code = vnodeCommitImpl(pInfo);
H
Hongze Cheng 已提交
376 377
  if (code) goto _exit;

H
Hongze Cheng 已提交
378
  vnodeReturnBufPool(pVnode);
H
Hongze Cheng 已提交
379

380
_exit:
H
Hongze Cheng 已提交
381
  // end commit
H
Hongze Cheng 已提交
382
  tsem_post(&pVnode->canCommit);
H
Hongze Cheng 已提交
383
  taosMemoryFree(pInfo);
H
Hongze Cheng 已提交
384 385
  return code;
}
386

H
refact  
Hongze Cheng 已提交
387
int vnodeAsyncCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
388
  int32_t code = 0;
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
H
Hongze Cheng 已提交
391 392 393 394
  if (NULL == pInfo) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
395 396 397 398 399 400 401 402

  // prepare to commit
  code = vnodePrepareCommit(pVnode, pInfo);
  if (TSDB_CODE_SUCCESS != code) {
    goto _exit;
  }

  // schedule the task
403
  code = vnodeScheduleTask(vnodeCommitTask, pInfo);
H
Hongze Cheng 已提交
404

H
Hongze Cheng 已提交
405 406
_exit:
  if (code) {
407 408 409
    if (NULL != pInfo) {
      taosMemoryFree(pInfo);
    }
410 411
    tsem_post(&pVnode->canCommit);
    vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
H
Hongze Cheng 已提交
412 413
           pVnode->state.commitID);
  } else {
414 415
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
          pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
H
Hongze Cheng 已提交
416 417
  }
  return code;
H
Hongze Cheng 已提交
418
}
H
refact  
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420 421
int vnodeSyncCommit(SVnode *pVnode) {
  vnodeAsyncCommit(pVnode);
H
Hongze Cheng 已提交
422 423
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
H
Hongze Cheng 已提交
424 425 426
  return 0;
}

H
Hongze Cheng 已提交
427 428 429 430 431 432
static int vnodeCommitImpl(SCommitInfo *pInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  char    dir[TSDB_FILENAME_LEN] = {0};
  SVnode *pVnode = pInfo->pVnode;
H
Hongze Cheng 已提交
433

434
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
435 436
        pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);

437
  vnodeUpdCommitSched(pVnode);
H
Hongze Cheng 已提交
438

439 440 441 442 443 444
  // persist wal before starting
  if (walPersist(pVnode->pWal) < 0) {
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

H
Hongze Cheng 已提交
445 446 447 448 449
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
450

451
  syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
H
Hongze Cheng 已提交
452 453

  // commit each sub-system
H
Hongze Cheng 已提交
454
  code = tsdbCommit(pVnode->pTsdb, pInfo);
455 456
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
457
  if (VND_IS_RSMA(pVnode)) {
H
Hongze Cheng 已提交
458
    code = smaCommit(pVnode->pSma, pInfo);
H
Hongze Cheng 已提交
459
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
460
  }
C
Cary Xu 已提交
461

H
Hongze Cheng 已提交
462
  if (tqCommit(pVnode->pTq) < 0) {
H
Hongze Cheng 已提交
463 464
    code = TSDB_CODE_FAILED;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
465 466 467
  }

  // commit info
H
Hongze Cheng 已提交
468
  if (vnodeCommitInfo(dir) < 0) {
H
Hongze Cheng 已提交
469 470
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
471
  }
H
Hongze Cheng 已提交
472

473 474 475
  code = tsdbFinishCommit(pVnode->pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
476 477 478 479
  if (VND_IS_RSMA(pVnode)) {
    code = smaFinishCommit(pVnode->pSma);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
480

481
  if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) {
482 483 484 485
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
486
  pVnode->state.committed = pInfo->info.state.committed;
H
Hongze Cheng 已提交
487

488 489
  if (smaPostCommit(pVnode->pSma) < 0) {
    vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
490 491
    return -1;
  }
H
Hongze Cheng 已提交
492

493
  syncEndSnapshot(pVnode->sync);
H
Hongze Cheng 已提交
494

H
Hongze Cheng 已提交
495 496
_exit:
  if (code) {
S
Shengliang Guan 已提交
497
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
498 499 500
  } else {
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
  }
H
Hongze Cheng 已提交
501 502 503
  return 0;
}

H
Hongze Cheng 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
bool vnodeShouldRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  return taosCheckExistFile(tFName);
}

void vnodeRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  (void)taosRemoveFile(tFName);
}

H
Hongze Cheng 已提交
520 521 522 523
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
  const SVState *pState = (SVState *)pObj;

  if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
H
Hongze Cheng 已提交
524
  if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
H
Hongze Cheng 已提交
525
  if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
H
Hongze Cheng 已提交
526 527 528 529 530 531 532

  return 0;
}

static int vnodeDecodeState(const SJson *pJson, void *pObj) {
  SVState *pState = (SVState *)pObj;

533 534
  int32_t code;
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
H
Hongze Cheng 已提交
535
  if (code < 0) return -1;
H
Hongze Cheng 已提交
536 537
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
538 539
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570

  return 0;
}

static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
  SJson *pJson;
  char  *pData;

  *ppData = NULL;

  pJson = tjsonCreateObject();
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  pData = tjsonToString(pJson);
  if (pData == NULL) {
    goto _err;
  }

  tjsonDelete(pJson);

  *ppData = pData;
H
Hongze Cheng 已提交
571
  return 0;
H
Hongze Cheng 已提交
572 573 574 575

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
576 577
}

H
Hongze Cheng 已提交
578
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
H
Hongze Cheng 已提交
579 580
  SJson *pJson = NULL;

H
fix bug  
Hongze Cheng 已提交
581
  pJson = tjsonParse(pData);
H
Hongze Cheng 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  tjsonDelete(pJson);

H
Hongze Cheng 已提交
596
  return 0;
H
Hongze Cheng 已提交
597 598 599 600

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
601
}
新手
引导
客服 返回
顶部