vnodeCommit.c 12.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
Hongze Cheng 已提交
17
#include "vnodeInt.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19 20
#define VND_INFO_FNAME_TMP "vnode_tmp.json"

H
Hongze Cheng 已提交
21 22
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
H
refact  
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25
#define WAIT_TIME_MILI_SEC 10  // miliseconds

H
Hongze Cheng 已提交
26
int vnodeBegin(SVnode *pVnode) {
H
Hongze Cheng 已提交
27
  // alloc buffer pool
H
Hongze Cheng 已提交
28
  taosThreadMutexLock(&pVnode->mutex);
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31 32 33 34 35 36
  int32_t nTry = 0;
  for (;;) {
    while (pVnode->freeList == NULL) {
      vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry);

      struct timeval  tv;
      struct timespec ts;
H
Hongze Cheng 已提交
37

H
Hongze Cheng 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
      taosGetTimeOfDay(&tv);
      ts.tv_sec = tv.tv_sec;
      ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;

      int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
      if (rc == ETIMEDOUT) {  // time out, break
        break;
      } else if (rc != 0) {  // error occurred
        terrno = TAOS_SYSTEM_ERROR(rc);
        vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno));
        taosThreadMutexUnlock(&pVnode->mutex);
        return -1;
      }
    }

    if (pVnode->freeList) {
      // allocate from free list
      pVnode->inUse = pVnode->freeList;
      pVnode->inUse->nRef = 1;
      pVnode->freeList = pVnode->inUse->freeNext;
      pVnode->inUse->freeNext = NULL;
      break;
    } else if (nTry == 1) {
      // try to recycle a buffer pool
      vInfo("vgId:%d no free buffer pool, try to recycle...", TD_VID(pVnode));
      ASSERT(false);  // TODO: condition of nTry == 1 may not be reasonable
    }
  }
H
Hongze Cheng 已提交
66

H
Hongze Cheng 已提交
67
  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
68

H
Hongze Cheng 已提交
69
  pVnode->state.commitID++;
H
Hongze Cheng 已提交
70
  // begin meta
71
  if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
S
Shengliang Guan 已提交
72
    vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
73 74 75 76
    return -1;
  }

  // begin tsdb
H
Hongze Cheng 已提交
77 78 79 80
  if (tsdbBegin(pVnode->pTsdb) < 0) {
    vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }
C
Cary Xu 已提交
81

C
Cary Xu 已提交
82
  // begin sma
C
Cary Xu 已提交
83
  if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
C
Cary Xu 已提交
84 85 86
    vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }
C
Cary Xu 已提交
87

H
Hongze Cheng 已提交
88 89 90
  return 0;
}

C
Cary Xu 已提交
91 92
int vnodeShouldCommit(SVnode *pVnode) {
  if (pVnode->inUse) {
93
    return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
C
Cary Xu 已提交
94 95 96
  }
  return false;
}
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98 99 100
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile;
H
Hongze Cheng 已提交
101
  char     *data;
H
Hongze Cheng 已提交
102 103 104 105 106 107

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  // encode info
  data = NULL;

H
Hongze Cheng 已提交
108
  if (vnodeEncodeInfo(pInfo, &data) < 0) {
109
    vError("failed to encode json info.");
H
Hongze Cheng 已提交
110 111 112 113 114 115
    return -1;
  }

  // save info to a vnode_tmp.json
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
116
    vError("failed to open info file:%s for write:%s", fname, terrstr());
H
Hongze Cheng 已提交
117 118 119 120
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
121
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
122
    vError("failed to write info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
123 124 125 126 127
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFile) < 0) {
S
Shengliang Guan 已提交
128
    vError("failed to fsync info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
129 130 131 132 133 134 135 136 137
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFile);

  // free info binary
  taosMemoryFree(data);

138 139
  vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
        pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
H
Hongze Cheng 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160

  return 0;

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(data);
  return -1;
}

int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
  char fname[TSDB_FILENAME_LEN];
  char tfname[TSDB_FILENAME_LEN];

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  if (taosRenameFile(tfname, fname) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
161
  vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
H
Hongze Cheng 已提交
162 163 164 165

  return 0;
}

H
Hongze Cheng 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile = NULL;
  char     *pData = NULL;
  int64_t   size;

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);

  // read info
  pFile = taosOpenFile(fname, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
186
  pData = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
187 188 189 190 191 192 193 194 195 196
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (taosReadFile(pFile, pData, size) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
197 198
  pData[size] = '\0';

H
Hongze Cheng 已提交
199 200 201 202 203 204 205 206 207 208
  taosCloseFile(&pFile);

  // decode info
  if (vnodeDecodeInfo(pData, pInfo) < 0) {
    taosMemoryFree(pData);
    return -1;
  }

  taosMemoryFree(pData);

H
Hongze Cheng 已提交
209
  return 0;
H
Hongze Cheng 已提交
210 211 212 213 214

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(pData);
  return -1;
H
Hongze Cheng 已提交
215 216
}

217 218 219 220 221
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
  int32_t code = 0;
  int32_t lino = 0;
  char    dir[TSDB_FILENAME_LEN] = {0};

H
Hongze Cheng 已提交
222 223
  tsem_wait(&pVnode->canCommit);

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
  pVnode->state.commitTerm = pVnode->state.applyTerm;

  pInfo->info.config = pVnode->config;
  pInfo->info.state.committed = pVnode->state.applied;
  pInfo->info.state.commitTerm = pVnode->state.applyTerm;
  pInfo->info.state.commitID = pVnode->state.commitID;
  pInfo->pVnode = pVnode;
  pInfo->txn = metaGetTxn(pVnode->pMeta);

  // save info
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
239 240

  vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
241 242 243 244 245
  if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

246
  tsdbPrepareCommit(pVnode->pTsdb);
K
kailixu 已提交
247

248 249
  metaPrepareAsyncCommit(pVnode->pMeta);

250 251
  code = smaPrepareAsyncCommit(pVnode->pSma);
  if (code) goto _exit;
K
kailixu 已提交
252

H
Hongze Cheng 已提交
253 254
  vnodeBufPoolUnRef(pVnode->inUse);
  pVnode->inUse = NULL;
255 256 257 258 259 260 261 262

_exit:
  if (code) {
    vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
           tstrerror(code), pVnode->state.commitID);
  } else {
    vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
  }
263

264
  return code;
H
Hongze Cheng 已提交
265
}
266

H
Hongze Cheng 已提交
267 268 269
static int32_t vnodeCommitTask(void *arg) {
  int32_t code = 0;

H
Hongze Cheng 已提交
270
  SCommitInfo *pInfo = (SCommitInfo *)arg;
H
Hongze Cheng 已提交
271

H
Hongze Cheng 已提交
272 273
  // commit
  code = vnodeCommitImpl(pInfo);
H
Hongze Cheng 已提交
274 275
  if (code) goto _exit;

276
_exit:
H
Hongze Cheng 已提交
277 278 279
  // end commit
  tsem_post(&pInfo->pVnode->canCommit);
  taosMemoryFree(pInfo);
H
Hongze Cheng 已提交
280 281
  return code;
}
H
refact  
Hongze Cheng 已提交
282
int vnodeAsyncCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
283
  int32_t code = 0;
H
Hongze Cheng 已提交
284

H
Hongze Cheng 已提交
285
  SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
H
Hongze Cheng 已提交
286 287 288 289
  if (NULL == pInfo) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
290 291 292 293 294 295 296 297

  // prepare to commit
  code = vnodePrepareCommit(pVnode, pInfo);
  if (TSDB_CODE_SUCCESS != code) {
    goto _exit;
  }

  // schedule the task
298
  code = vnodeScheduleTask(vnodeCommitTask, pInfo);
H
Hongze Cheng 已提交
299

H
Hongze Cheng 已提交
300 301
_exit:
  if (code) {
302 303 304
    if (NULL != pInfo) {
      taosMemoryFree(pInfo);
    }
305 306
    tsem_post(&pVnode->canCommit);
    vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
H
Hongze Cheng 已提交
307 308
           pVnode->state.commitID);
  } else {
309 310
    vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode),
          pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied);
H
Hongze Cheng 已提交
311 312
  }
  return code;
H
Hongze Cheng 已提交
313
}
H
refact  
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315 316
int vnodeSyncCommit(SVnode *pVnode) {
  vnodeAsyncCommit(pVnode);
H
Hongze Cheng 已提交
317 318
  tsem_wait(&pVnode->canCommit);
  tsem_post(&pVnode->canCommit);
H
Hongze Cheng 已提交
319 320 321
  return 0;
}

H
Hongze Cheng 已提交
322 323 324 325 326 327
static int vnodeCommitImpl(SCommitInfo *pInfo) {
  int32_t code = 0;
  int32_t lino = 0;

  char    dir[TSDB_FILENAME_LEN] = {0};
  SVnode *pVnode = pInfo->pVnode;
H
Hongze Cheng 已提交
328

329
  vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
330
        pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
H
Hongze Cheng 已提交
331

332 333 334 335 336 337
  // persist wal before starting
  if (walPersist(pVnode->pWal) < 0) {
    vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
    return -1;
  }

H
Hongze Cheng 已提交
338 339 340 341 342
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
343 344 345

  // walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
  syncBeginSnapshot(pVnode->sync, pVnode->state.applied);
H
Hongze Cheng 已提交
346 347

  // commit each sub-system
H
Hongze Cheng 已提交
348
  code = tsdbCommit(pVnode->pTsdb, pInfo);
C
Cary Xu 已提交
349 350
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
351
  if (VND_IS_RSMA(pVnode)) {
H
Hongze Cheng 已提交
352
    code = smaCommit(pVnode->pSma, pInfo);
H
Hongze Cheng 已提交
353
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
354
  }
C
Cary Xu 已提交
355

H
Hongze Cheng 已提交
356
  if (tqCommit(pVnode->pTq) < 0) {
H
Hongze Cheng 已提交
357 358
    code = TSDB_CODE_FAILED;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
359 360 361
  }

  // commit info
H
Hongze Cheng 已提交
362
  if (vnodeCommitInfo(dir, &pInfo->info) < 0) {
H
Hongze Cheng 已提交
363 364
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
365
  }
H
Hongze Cheng 已提交
366

C
Cary Xu 已提交
367 368 369
  code = tsdbFinishCommit(pVnode->pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
370 371 372 373
  if (VND_IS_RSMA(pVnode)) {
    code = smaFinishCommit(pVnode->pSma);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
374

375
  if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) {
376 377 378 379
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
380
  pVnode->state.committed = pInfo->info.state.committed;
H
Hongze Cheng 已提交
381

C
Cary Xu 已提交
382 383
  if (smaPostCommit(pVnode->pSma) < 0) {
    vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
384 385
    return -1;
  }
H
Hongze Cheng 已提交
386

387 388
  // walEndSnapshot(pVnode->pWal);
  syncEndSnapshot(pVnode->sync);
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390 391
_exit:
  if (code) {
S
Shengliang Guan 已提交
392
    vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
393 394 395
  } else {
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
  }
H
Hongze Cheng 已提交
396 397 398
  return 0;
}

H
Hongze Cheng 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
bool vnodeShouldRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  return taosCheckExistFile(tFName);
}

void vnodeRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  (void)taosRemoveFile(tFName);
}

H
Hongze Cheng 已提交
415 416 417 418
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
  const SVState *pState = (SVState *)pObj;

  if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
H
Hongze Cheng 已提交
419
  if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
H
Hongze Cheng 已提交
420
  if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
H
Hongze Cheng 已提交
421 422 423 424 425 426 427

  return 0;
}

static int vnodeDecodeState(const SJson *pJson, void *pObj) {
  SVState *pState = (SVState *)pObj;

428 429
  int32_t code;
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
H
Hongze Cheng 已提交
430
  if (code < 0) return -1;
H
Hongze Cheng 已提交
431 432
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
433 434
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465

  return 0;
}

static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
  SJson *pJson;
  char  *pData;

  *ppData = NULL;

  pJson = tjsonCreateObject();
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  pData = tjsonToString(pJson);
  if (pData == NULL) {
    goto _err;
  }

  tjsonDelete(pJson);

  *ppData = pData;
H
Hongze Cheng 已提交
466
  return 0;
H
Hongze Cheng 已提交
467 468 469 470

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
471 472
}

H
Hongze Cheng 已提交
473
int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
H
Hongze Cheng 已提交
474 475
  SJson *pJson = NULL;

H
fix bug  
Hongze Cheng 已提交
476
  pJson = tjsonParse(pData);
H
Hongze Cheng 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  tjsonDelete(pJson);

H
Hongze Cheng 已提交
491
  return 0;
H
Hongze Cheng 已提交
492 493 494 495

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
496
}