vnodeCommit.c 10.8 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
refact  
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
#define VND_INFO_FNAME     "vnode.json"
H
Hongze Cheng 已提交
19 20
#define VND_INFO_FNAME_TMP "vnode_tmp.json"

H
Hongze Cheng 已提交
21
static int  vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
H
Hongze Cheng 已提交
22
static int  vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
H
Hongze Cheng 已提交
23
static int  vnodeCommitImpl(void *arg);
H
Hongze Cheng 已提交
24
static void vnodeWaitCommit(SVnode *pVnode);
H
refact  
Hongze Cheng 已提交
25

H
Hongze Cheng 已提交
26
int vnodeBegin(SVnode *pVnode) {
H
Hongze Cheng 已提交
27
  // alloc buffer pool
H
Hongze Cheng 已提交
28
  taosThreadMutexLock(&pVnode->mutex);
H
Hongze Cheng 已提交
29 30

  while (pVnode->pPool == NULL) {
H
Hongze Cheng 已提交
31
    taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
H
Hongze Cheng 已提交
32 33 34
  }

  pVnode->inUse = pVnode->pPool;
H
Hongze Cheng 已提交
35
  pVnode->inUse->nRef = 1;
H
Hongze Cheng 已提交
36 37 38
  pVnode->pPool = pVnode->inUse->next;
  pVnode->inUse->next = NULL;

H
Hongze Cheng 已提交
39
  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
40

H
Hongze Cheng 已提交
41
  pVnode->state.commitID++;
H
Hongze Cheng 已提交
42
  // begin meta
43
  if (metaBegin(pVnode->pMeta, 0) < 0) {
S
Shengliang Guan 已提交
44
    vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
45 46 47 48
    return -1;
  }

  // begin tsdb
H
Hongze Cheng 已提交
49 50 51 52
  if (tsdbBegin(pVnode->pTsdb) < 0) {
    vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }
C
Cary Xu 已提交
53

H
Hongze Cheng 已提交
54 55
  if (pVnode->pSma) {
    if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
S
Shengliang Guan 已提交
56
      vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
57 58 59
      return -1;
    }

H
Hongze Cheng 已提交
60
    if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
S
Shengliang Guan 已提交
61
      vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
62 63
      return -1;
    }
H
Hongze Cheng 已提交
64 65
  }

C
Cary Xu 已提交
66 67 68
  // begin sma
  smaBegin(pVnode->pSma);  // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged

H
Hongze Cheng 已提交
69 70 71
  return 0;
}

C
Cary Xu 已提交
72 73
int vnodeShouldCommit(SVnode *pVnode) {
  if (pVnode->inUse) {
74
    return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
C
Cary Xu 已提交
75 76 77
  }
  return false;
}
H
Hongze Cheng 已提交
78

H
Hongze Cheng 已提交
79 80 81
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile;
H
Hongze Cheng 已提交
82
  char     *data;
H
Hongze Cheng 已提交
83 84 85 86 87 88

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  // encode info
  data = NULL;

H
Hongze Cheng 已提交
89
  if (vnodeEncodeInfo(pInfo, &data) < 0) {
90
    vError("failed to encode json info.");
H
Hongze Cheng 已提交
91 92 93 94 95 96
    return -1;
  }

  // save info to a vnode_tmp.json
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
97
    vError("failed to open info file:%s for write:%s", fname, terrstr());
H
Hongze Cheng 已提交
98 99 100 101
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
102
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
103
    vError("failed to write info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
104 105 106 107 108
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFile) < 0) {
S
Shengliang Guan 已提交
109
    vError("failed to fsync info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
110 111 112 113 114 115 116 117 118
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFile);

  // free info binary
  taosMemoryFree(data);

S
Shengliang Guan 已提交
119
  vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname);
H
Hongze Cheng 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

  return 0;

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(data);
  return -1;
}

int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
  char fname[TSDB_FILENAME_LEN];
  char tfname[TSDB_FILENAME_LEN];

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  if (taosRenameFile(tfname, fname) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
141
  vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
H
Hongze Cheng 已提交
142 143 144 145

  return 0;
}

H
Hongze Cheng 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile = NULL;
  char     *pData = NULL;
  int64_t   size;

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);

  // read info
  pFile = taosOpenFile(fname, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
166
  pData = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
167 168 169 170 171 172 173 174 175 176
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (taosReadFile(pFile, pData, size) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
177 178
  pData[size] = '\0';

H
Hongze Cheng 已提交
179 180 181 182 183 184 185 186 187 188
  taosCloseFile(&pFile);

  // decode info
  if (vnodeDecodeInfo(pData, pInfo) < 0) {
    taosMemoryFree(pData);
    return -1;
  }

  taosMemoryFree(pData);

H
Hongze Cheng 已提交
189
  return 0;
H
Hongze Cheng 已提交
190 191 192 193 194

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(pData);
  return -1;
H
Hongze Cheng 已提交
195 196
}

H
refact  
Hongze Cheng 已提交
197
int vnodeAsyncCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
198 199
  vnodeWaitCommit(pVnode);

H
Hongze Cheng 已提交
200
  // vnodeBufPoolSwitch(pVnode);
H
Hongze Cheng 已提交
201
  // tsdbPrepareCommit(pVnode->pTsdb);
H
more  
Hongze Cheng 已提交
202

H
Hongze Cheng 已提交
203
  vnodeScheduleTask(vnodeCommitImpl, pVnode);
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205 206
  return 0;
}
H
refact  
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209 210
int vnodeSyncCommit(SVnode *pVnode) {
  vnodeAsyncCommit(pVnode);
  vnodeWaitCommit(pVnode);
H
Hongze Cheng 已提交
211
  tsem_post(&(pVnode->canCommit));
H
Hongze Cheng 已提交
212 213 214
  return 0;
}

H
Hongze Cheng 已提交
215
int vnodeCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
216 217
  int32_t    code = 0;
  int32_t    lino = 0;
218
  SVnodeInfo info = {0};
H
Hongze Cheng 已提交
219 220
  char       dir[TSDB_FILENAME_LEN];

H
Hongze Cheng 已提交
221 222
  vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
        pVnode->state.applied);
H
Hongze Cheng 已提交
223

C
Cary Xu 已提交
224
  pVnode->state.commitTerm = pVnode->state.applyTerm;
C
Cary Xu 已提交
225

H
Hongze Cheng 已提交
226 227
  // save info
  info.config = pVnode->config;
H
Hongze Cheng 已提交
228
  info.state.committed = pVnode->state.applied;
H
Hongze Cheng 已提交
229
  info.state.commitTerm = pVnode->state.applyTerm;
H
Hongze Cheng 已提交
230
  info.state.commitID = pVnode->state.commitID;
H
Hongze Cheng 已提交
231 232 233 234 235
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
H
Hongze Cheng 已提交
236
  if (vnodeSaveInfo(dir, &info) < 0) {
H
Hongze Cheng 已提交
237 238
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
239
  }
240
  walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
H
Hongze Cheng 已提交
241

C
Cary Xu 已提交
242 243
  // preCommit
  // smaSyncPreCommit(pVnode->pSma);
H
Hongze Cheng 已提交
244 245
  code = smaAsyncPreCommit(pVnode->pSma);
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
246 247 248 249

  vnodeBufPoolUnRef(pVnode->inUse);
  pVnode->inUse = NULL;

H
Hongze Cheng 已提交
250 251
  // commit each sub-system
  if (metaCommit(pVnode->pMeta) < 0) {
H
Hongze Cheng 已提交
252 253
    code = TSDB_CODE_FAILED;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
254
  }
C
Cary Xu 已提交
255

C
Cary Xu 已提交
256
  if (VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
257
    if (smaAsyncCommit(pVnode->pSma) < 0) {
258
      vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
259 260
      return -1;
    }
C
Cary Xu 已提交
261

C
Cary Xu 已提交
262
    if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
263
      vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
264 265 266
      return -1;
    }
    if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
267
      vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
268 269 270
      return -1;
    }
    if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
271
      vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
272 273 274
      return -1;
    }
  } else {
H
Hongze Cheng 已提交
275 276
    code = tsdbCommit(pVnode->pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
277
  }
C
Cary Xu 已提交
278

H
Hongze Cheng 已提交
279
  if (tqCommit(pVnode->pTq) < 0) {
H
Hongze Cheng 已提交
280 281
    code = TSDB_CODE_FAILED;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
282 283 284 285
  }

  // commit info
  if (vnodeCommitInfo(dir, &info) < 0) {
H
Hongze Cheng 已提交
286 287
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
288
  }
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
  tsdbFinishCommit(pVnode->pTsdb);

292 293 294 295 296
  if (metaFinishCommit(pVnode->pMeta) < 0) {
    code = terrno;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

C
Cary Xu 已提交
297
  pVnode->state.committed = info.state.committed;
H
Hongze Cheng 已提交
298

C
Cary Xu 已提交
299
  // postCommit
C
Cary Xu 已提交
300
  // smaSyncPostCommit(pVnode->pSma);
C
Cary Xu 已提交
301
  if (smaAsyncPostCommit(pVnode->pSma) < 0) {
302
    vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
303 304
    return -1;
  }
H
Hongze Cheng 已提交
305 306

  // apply the commit (TODO)
307
  walEndSnapshot(pVnode->pWal);
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310 311 312 313 314
_exit:
  if (code) {
    vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  } else {
    vInfo("vgId:%d, commit end", TD_VID(pVnode));
  }
H
Hongze Cheng 已提交
315 316 317
  return 0;
}

H
Hongze Cheng 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
bool vnodeShouldRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  return taosCheckExistFile(tFName);
}

void vnodeRollback(SVnode *pVnode) {
  char tFName[TSDB_FILENAME_LEN] = {0};
  snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
           VND_INFO_FNAME_TMP);

  (void)taosRemoveFile(tFName);
}

H
Hongze Cheng 已提交
334
static int vnodeCommitImpl(void *arg) {
H
Hongze Cheng 已提交
335
  SVnode *pVnode = (SVnode *)arg;
H
refact  
Hongze Cheng 已提交
336

H
Hongze Cheng 已提交
337
  // metaCommit(pVnode->pMeta);
H
more  
Hongze Cheng 已提交
338
  tqCommit(pVnode->pTq);
H
Hongze Cheng 已提交
339
  // tsdbCommit(pVnode->pTsdb, );
H
more  
Hongze Cheng 已提交
340

H
Hongze Cheng 已提交
341
  // vnodeBufPoolRecycle(pVnode);
H
more  
Hongze Cheng 已提交
342
  tsem_post(&(pVnode->canCommit));
H
refact  
Hongze Cheng 已提交
343 344 345
  return 0;
}

H
Hongze Cheng 已提交
346 347
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }

H
Hongze Cheng 已提交
348 349 350 351
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
  const SVState *pState = (SVState *)pObj;

  if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
H
Hongze Cheng 已提交
352
  if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
H
Hongze Cheng 已提交
353
  if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
H
Hongze Cheng 已提交
354 355 356 357 358 359 360

  return 0;
}

static int vnodeDecodeState(const SJson *pJson, void *pObj) {
  SVState *pState = (SVState *)pObj;

361 362
  int32_t code;
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
H
Hongze Cheng 已提交
363
  if (code < 0) return -1;
H
Hongze Cheng 已提交
364 365
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
366 367
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398

  return 0;
}

static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
  SJson *pJson;
  char  *pData;

  *ppData = NULL;

  pJson = tjsonCreateObject();
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  pData = tjsonToString(pJson);
  if (pData == NULL) {
    goto _err;
  }

  tjsonDelete(pJson);

  *ppData = pData;
H
Hongze Cheng 已提交
399
  return 0;
H
Hongze Cheng 已提交
400 401 402 403

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
404 405
}

H
Hongze Cheng 已提交
406
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
H
Hongze Cheng 已提交
407 408
  SJson *pJson = NULL;

H
fix bug  
Hongze Cheng 已提交
409
  pJson = tjsonParse(pData);
H
Hongze Cheng 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  tjsonDelete(pJson);

H
Hongze Cheng 已提交
424
  return 0;
H
Hongze Cheng 已提交
425 426 427 428

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
429
}