vnodeCommit.c 10.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnd.h"
H
refact  
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
#define VND_INFO_FNAME     "vnode.json"
H
Hongze Cheng 已提交
19 20
#define VND_INFO_FNAME_TMP "vnode_tmp.json"

H
Hongze Cheng 已提交
21
static int  vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
H
Hongze Cheng 已提交
22
static int  vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
H
Hongze Cheng 已提交
23 24
static int  vnodeStartCommit(SVnode *pVnode);
static int  vnodeEndCommit(SVnode *pVnode);
H
Hongze Cheng 已提交
25
static int  vnodeCommitImpl(void *arg);
H
Hongze Cheng 已提交
26
static void vnodeWaitCommit(SVnode *pVnode);
H
refact  
Hongze Cheng 已提交
27

H
Hongze Cheng 已提交
28
int vnodeBegin(SVnode *pVnode) {
H
Hongze Cheng 已提交
29
  // alloc buffer pool
H
Hongze Cheng 已提交
30
  taosThreadMutexLock(&pVnode->mutex);
H
Hongze Cheng 已提交
31 32

  while (pVnode->pPool == NULL) {
H
Hongze Cheng 已提交
33
    taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
H
Hongze Cheng 已提交
34 35 36
  }

  pVnode->inUse = pVnode->pPool;
H
Hongze Cheng 已提交
37
  pVnode->inUse->nRef = 1;
H
Hongze Cheng 已提交
38 39 40
  pVnode->pPool = pVnode->inUse->next;
  pVnode->inUse->next = NULL;

H
Hongze Cheng 已提交
41
  taosThreadMutexUnlock(&pVnode->mutex);
H
Hongze Cheng 已提交
42

H
Hongze Cheng 已提交
43
  pVnode->state.commitID++;
H
Hongze Cheng 已提交
44
  // begin meta
45
  if (metaBegin(pVnode->pMeta, 0) < 0) {
S
Shengliang Guan 已提交
46
    vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
47 48 49 50
    return -1;
  }

  // begin tsdb
H
Hongze Cheng 已提交
51 52 53 54
  if (tsdbBegin(pVnode->pTsdb) < 0) {
    vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
    return -1;
  }
C
Cary Xu 已提交
55

H
Hongze Cheng 已提交
56 57
  if (pVnode->pSma) {
    if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
S
Shengliang Guan 已提交
58
      vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
59 60 61
      return -1;
    }

H
Hongze Cheng 已提交
62
    if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
S
Shengliang Guan 已提交
63
      vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
64 65
      return -1;
    }
H
Hongze Cheng 已提交
66 67
  }

C
Cary Xu 已提交
68 69 70
  // begin sma
  smaBegin(pVnode->pSma);  // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged

H
Hongze Cheng 已提交
71 72 73
  return 0;
}

C
Cary Xu 已提交
74 75
int vnodeShouldCommit(SVnode *pVnode) {
  if (pVnode->inUse) {
H
Hongze Cheng 已提交
76
    return pVnode->inUse->size > pVnode->inUse->node.size;
C
Cary Xu 已提交
77 78 79
  }
  return false;
}
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81 82 83
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile;
H
Hongze Cheng 已提交
84
  char     *data;
H
Hongze Cheng 已提交
85 86 87 88 89 90

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  // encode info
  data = NULL;

H
Hongze Cheng 已提交
91
  if (vnodeEncodeInfo(pInfo, &data) < 0) {
92
    vError("failed to encode json info.");
H
Hongze Cheng 已提交
93 94 95 96 97 98
    return -1;
  }

  // save info to a vnode_tmp.json
  pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
  if (pFile == NULL) {
S
Shengliang Guan 已提交
99
    vError("failed to open info file:%s for write:%s", fname, terrstr());
H
Hongze Cheng 已提交
100 101 102 103
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

H
Hongze Cheng 已提交
104
  if (taosWriteFile(pFile, data, strlen(data)) < 0) {
105
    vError("failed to write info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
106 107 108 109 110
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFile) < 0) {
S
Shengliang Guan 已提交
111
    vError("failed to fsync info file:%s error:%s", fname, terrstr());
H
Hongze Cheng 已提交
112 113 114 115 116 117 118 119 120
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFile);

  // free info binary
  taosMemoryFree(data);

S
Shengliang Guan 已提交
121
  vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname);
H
Hongze Cheng 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142

  return 0;

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(data);
  return -1;
}

int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
  char fname[TSDB_FILENAME_LEN];
  char tfname[TSDB_FILENAME_LEN];

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
  snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);

  if (taosRenameFile(tfname, fname) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

S
Shengliang Guan 已提交
143
  vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
H
Hongze Cheng 已提交
144 145 146 147

  return 0;
}

H
Hongze Cheng 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr pFile = NULL;
  char     *pData = NULL;
  int64_t   size;

  snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);

  // read info
  pFile = taosOpenFile(fname, TD_FILE_READ);
  if (pFile == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  if (taosFStatFile(pFile, &size, NULL) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
168
  pData = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
169 170 171 172 173 174 175 176 177 178
  if (pData == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (taosReadFile(pFile, pData, size) < 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

H
Hongze Cheng 已提交
179 180
  pData[size] = '\0';

H
Hongze Cheng 已提交
181 182 183 184 185 186 187 188 189 190
  taosCloseFile(&pFile);

  // decode info
  if (vnodeDecodeInfo(pData, pInfo) < 0) {
    taosMemoryFree(pData);
    return -1;
  }

  taosMemoryFree(pData);

H
Hongze Cheng 已提交
191
  return 0;
H
Hongze Cheng 已提交
192 193 194 195 196

_err:
  taosCloseFile(&pFile);
  taosMemoryFree(pData);
  return -1;
H
Hongze Cheng 已提交
197 198
}

H
refact  
Hongze Cheng 已提交
199
int vnodeAsyncCommit(SVnode *pVnode) {
H
Hongze Cheng 已提交
200 201
  vnodeWaitCommit(pVnode);

H
Hongze Cheng 已提交
202
  // vnodeBufPoolSwitch(pVnode);
H
Hongze Cheng 已提交
203
  // tsdbPrepareCommit(pVnode->pTsdb);
H
more  
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205
  vnodeScheduleTask(vnodeCommitImpl, pVnode);
H
Hongze Cheng 已提交
206

H
Hongze Cheng 已提交
207 208
  return 0;
}
H
refact  
Hongze Cheng 已提交
209

H
Hongze Cheng 已提交
210 211 212
int vnodeSyncCommit(SVnode *pVnode) {
  vnodeAsyncCommit(pVnode);
  vnodeWaitCommit(pVnode);
H
Hongze Cheng 已提交
213
  tsem_post(&(pVnode->canCommit));
H
Hongze Cheng 已提交
214 215 216
  return 0;
}

H
Hongze Cheng 已提交
217
int vnodeCommit(SVnode *pVnode) {
218
  SVnodeInfo info = {0};
H
Hongze Cheng 已提交
219 220
  char       dir[TSDB_FILENAME_LEN];

H
Hongze Cheng 已提交
221 222
  vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
        pVnode->state.applied);
H
Hongze Cheng 已提交
223

C
Cary Xu 已提交
224
  pVnode->state.commitTerm = pVnode->state.applyTerm;
C
Cary Xu 已提交
225

H
Hongze Cheng 已提交
226 227
  // save info
  info.config = pVnode->config;
H
Hongze Cheng 已提交
228
  info.state.committed = pVnode->state.applied;
H
Hongze Cheng 已提交
229
  info.state.commitTerm = pVnode->state.applyTerm;
H
Hongze Cheng 已提交
230
  info.state.commitID = pVnode->state.commitID;
H
Hongze Cheng 已提交
231 232 233 234 235
  if (pVnode->pTfs) {
    snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
  } else {
    snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
  }
H
Hongze Cheng 已提交
236
  if (vnodeSaveInfo(dir, &info) < 0) {
237
    vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
238 239
    return -1;
  }
240
  walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
H
Hongze Cheng 已提交
241

C
Cary Xu 已提交
242 243
  // preCommit
  // smaSyncPreCommit(pVnode->pSma);
244 245
  if(smaAsyncPreCommit(pVnode->pSma) < 0){
    vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
246 247 248 249 250 251
    return -1;
  }

  vnodeBufPoolUnRef(pVnode->inUse);
  pVnode->inUse = NULL;

H
Hongze Cheng 已提交
252 253
  // commit each sub-system
  if (metaCommit(pVnode->pMeta) < 0) {
254
    vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
255 256
    return -1;
  }
C
Cary Xu 已提交
257

C
Cary Xu 已提交
258
  if (VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
259
    if (smaAsyncCommit(pVnode->pSma) < 0) {
260
      vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
261 262
      return -1;
    }
C
Cary Xu 已提交
263

C
Cary Xu 已提交
264
    if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
265
      vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
266 267 268
      return -1;
    }
    if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
269
      vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
270 271 272
      return -1;
    }
    if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
273
      vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
274 275 276 277
      return -1;
    }
  } else {
    if (tsdbCommit(pVnode->pTsdb) < 0) {
278
      vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
279 280
      return -1;
    }
H
Hongze Cheng 已提交
281
  }
C
Cary Xu 已提交
282

H
Hongze Cheng 已提交
283
  if (tqCommit(pVnode->pTq) < 0) {
284
    vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
285 286 287 288 289 290
    return -1;
  }
  // walCommit (TODO)

  // commit info
  if (vnodeCommitInfo(dir, &info) < 0) {
291
    vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
H
Hongze Cheng 已提交
292 293
    return -1;
  }
H
Hongze Cheng 已提交
294

C
Cary Xu 已提交
295
  pVnode->state.committed = info.state.committed;
H
Hongze Cheng 已提交
296

C
Cary Xu 已提交
297
  // postCommit
C
Cary Xu 已提交
298
  // smaSyncPostCommit(pVnode->pSma);
C
Cary Xu 已提交
299
  if (smaAsyncPostCommit(pVnode->pSma) < 0) {
300
    vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
C
Cary Xu 已提交
301 302
    return -1;
  }
H
Hongze Cheng 已提交
303 304

  // apply the commit (TODO)
305
  walEndSnapshot(pVnode->pWal);
H
Hongze Cheng 已提交
306

H
Hongze Cheng 已提交
307
  vInfo("vgId:%d, commit end", TD_VID(pVnode));
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309 310 311 312
  return 0;
}

static int vnodeCommitImpl(void *arg) {
H
Hongze Cheng 已提交
313
  SVnode *pVnode = (SVnode *)arg;
H
refact  
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315
  // metaCommit(pVnode->pMeta);
H
more  
Hongze Cheng 已提交
316
  tqCommit(pVnode->pTq);
H
Hongze Cheng 已提交
317
  // tsdbCommit(pVnode->pTsdb, );
H
more  
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319
  // vnodeBufPoolRecycle(pVnode);
H
more  
Hongze Cheng 已提交
320
  tsem_post(&(pVnode->canCommit));
H
refact  
Hongze Cheng 已提交
321 322 323 324 325 326 327 328 329 330 331
  return 0;
}

static int vnodeStartCommit(SVnode *pVnode) {
  // TODO
  return 0;
}

static int vnodeEndCommit(SVnode *pVnode) {
  // TODO
  return 0;
H
Hongze Cheng 已提交
332 333
}

H
Hongze Cheng 已提交
334 335
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }

H
Hongze Cheng 已提交
336 337 338 339
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
  const SVState *pState = (SVState *)pObj;

  if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
H
Hongze Cheng 已提交
340
  if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
H
Hongze Cheng 已提交
341
  if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
H
Hongze Cheng 已提交
342 343 344 345 346 347 348

  return 0;
}

static int vnodeDecodeState(const SJson *pJson, void *pObj) {
  SVState *pState = (SVState *)pObj;

349 350
  int32_t code;
  tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
H
Hongze Cheng 已提交
351
  if (code < 0) return -1;
H
Hongze Cheng 已提交
352 353
  tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
354 355
  tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
  if (code < 0) return -1;
H
Hongze Cheng 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386

  return 0;
}

static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
  SJson *pJson;
  char  *pData;

  *ppData = NULL;

  pJson = tjsonCreateObject();
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  pData = tjsonToString(pJson);
  if (pData == NULL) {
    goto _err;
  }

  tjsonDelete(pJson);

  *ppData = pData;
H
Hongze Cheng 已提交
387
  return 0;
H
Hongze Cheng 已提交
388 389 390 391

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
392 393
}

H
Hongze Cheng 已提交
394
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
H
Hongze Cheng 已提交
395 396
  SJson *pJson = NULL;

H
fix bug  
Hongze Cheng 已提交
397
  pJson = tjsonParse(pData);
H
Hongze Cheng 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411
  if (pJson == NULL) {
    return -1;
  }

  if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
    goto _err;
  }

  if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
    goto _err;
  }

  tjsonDelete(pJson);

H
Hongze Cheng 已提交
412
  return 0;
H
Hongze Cheng 已提交
413 414 415 416

_err:
  tjsonDelete(pJson);
  return -1;
H
Hongze Cheng 已提交
417
}