tsdbFS.c 32.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// =================================================================================================
H
Hongze Cheng 已提交
19
static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
H
Hongze Cheng 已提交
20
  int32_t  n = 0;
H
Hongze Cheng 已提交
21 22
  int8_t   hasDel = pFS->pDelFile ? 1 : 0;
  uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25 26
  // version
  n += tPutI8(p ? p + n : p, 0);

H
Hongze Cheng 已提交
27 28 29
  // SDelFile
  n += tPutI8(p ? p + n : p, hasDel);
  if (hasDel) {
H
Hongze Cheng 已提交
30
    n += tPutDelFile(p ? p + n : p, pFS->pDelFile);
H
Hongze Cheng 已提交
31 32
  }

H
Hongze Cheng 已提交
33
  // SArray<SDFileSet>
H
Hongze Cheng 已提交
34 35 36
  n += tPutU32v(p ? p + n : p, nSet);
  for (uint32_t iSet = 0; iSet < nSet; iSet++) {
    n += tPutDFileSet(p ? p + n : p, (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet));
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39
  return n;
H
Hongze Cheng 已提交
40 41
}

H
Hongze Cheng 已提交
42
static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) {
H
Hongze Cheng 已提交
43 44 45
  int32_t   code = 0;
  int64_t   n;
  int64_t   size;
H
Hongze Cheng 已提交
46
  uint8_t  *pData = NULL;
H
Hongze Cheng 已提交
47 48
  TdFilePtr pFD = NULL;

H
Hongze Cheng 已提交
49
  // to binary
H
Hongze Cheng 已提交
50
  size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM);
H
Hongze Cheng 已提交
51 52 53 54 55
  pData = taosMemoryMalloc(size);
  if (pData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
56
  n = tsdbEncodeFS(pData, pFS);
H
Hongze Cheng 已提交
57 58
  ASSERT(n + sizeof(TSCKSUM) == size);
  taosCalcChecksumAppend(0, pData, size);
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
  // create and write
61
  pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
H
Hongze Cheng 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
  if (pFD == NULL) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  n = taosWriteFile(pFD, pData, size);
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (taosFsyncFile(pFD) < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  taosCloseFile(&pFD);

  if (pData) taosMemoryFree(pData);
  return code;

_err:
S
Shengliang Guan 已提交
84
  tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
85 86 87 88
  if (pData) taosMemoryFree(pData);
  return code;
}

H
Hongze Cheng 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
// static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) {
//   int32_t code = 0;
//   char    fname[TSDB_FILENAME_LEN];

//   if (pFrom && pTo) {
//     bool isSameDisk = (pFrom->diskId.level == pTo->diskId.level) && (pFrom->diskId.id == pTo->diskId.id);

//     // head
//     if (isSameDisk && pFrom->pHeadF->commitID == pTo->pHeadF->commitID) {
//       ASSERT(pFrom->pHeadF->size == pTo->pHeadF->size);
//       ASSERT(pFrom->pHeadF->offset == pTo->pHeadF->offset);
//     } else {
//       tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
//       taosRemoveFile(fname);
//     }

//     // data
//     if (isSameDisk && pFrom->pDataF->commitID == pTo->pDataF->commitID) {
//       if (pFrom->pDataF->size > pTo->pDataF->size) {
//         code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE);
//         if (code) goto _err;
//       }
//     } else {
//       tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
//       taosRemoveFile(fname);
//     }

H
Hongze Cheng 已提交
116
//     // stt
H
Hongze Cheng 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
//     if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) {
//       if (pFrom->pLastF->size > pTo->pLastF->size) {
//         code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
//         if (code) goto _err;
//       }
//     } else {
//       tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
//       taosRemoveFile(fname);
//     }

//     // sma
//     if (isSameDisk && pFrom->pSmaF->commitID == pTo->pSmaF->commitID) {
//       if (pFrom->pSmaF->size > pTo->pSmaF->size) {
//         code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE);
//         if (code) goto _err;
//       }
//     } else {
//       tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
//       taosRemoveFile(fname);
//     }
//   } else if (pFrom) {
//     // head
//     tsdbHeadFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pHeadF, fname);
//     taosRemoveFile(fname);

//     // data
//     tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname);
//     taosRemoveFile(fname);

H
Hongze Cheng 已提交
146
//     // stt
H
Hongze Cheng 已提交
147 148 149 150 151 152 153 154 155 156 157
//     tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname);
//     taosRemoveFile(fname);

//     // fsm
//     tsdbSmaFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pSmaF, fname);
//     taosRemoveFile(fname);
//   }

//   return code;

// _err:
S
Shengliang Guan 已提交
158
//   tsdbError("vgId:%d, tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
//   return code;
// }

// static int32_t tsdbApplyDelFileChange(STsdbFS *pFS, SDelFile *pFrom, SDelFile *pTo) {
//   int32_t code = 0;
//   char    fname[TSDB_FILENAME_LEN];

//   if (pFrom && pTo) {
//     if (!tsdbDelFileIsSame(pFrom, pTo)) {
//       tsdbDelFileName(pFS->pTsdb, pFrom, fname);
//       if (taosRemoveFile(fname) < 0) {
//         code = TAOS_SYSTEM_ERROR(errno);
//         goto _err;
//       }
//     }
//   } else if (pFrom) {
//     tsdbDelFileName(pFS->pTsdb, pFrom, fname);
//     if (taosRemoveFile(fname) < 0) {
//       code = TAOS_SYSTEM_ERROR(errno);
//       goto _err;
//     }
//   } else {
//     // do nothing
//   }

//   return code;

// _err:
S
Shengliang Guan 已提交
187
//   tsdbError("vgId:%d, tsdb apply del file change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
//   return code;
// }

// static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSState *pTo) {
//   int32_t    code = 0;
//   int32_t    iFrom = 0;
//   int32_t    nFrom = taosArrayGetSize(pFrom->aDFileSet);
//   int32_t    iTo = 0;
//   int32_t    nTo = taosArrayGetSize(pTo->aDFileSet);
//   SDFileSet *pDFileSetFrom;
//   SDFileSet *pDFileSetTo;

//   // SDelFile
//   code = tsdbApplyDelFileChange(pFS, pFrom->pDelFile, pTo->pDelFile);
//   if (code) goto _err;

//   // SDFileSet
//   while (iFrom < nFrom && iTo < nTo) {
//     pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
//     pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo);

//     if (pDFileSetFrom->fid == pDFileSetTo->fid) {
//       code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo);
//       if (code) goto _err;

//       iFrom++;
//       iTo++;
//     } else if (pDFileSetFrom->fid < pDFileSetTo->fid) {
//       code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
//       if (code) goto _err;

//       iFrom++;
//     } else {
//       iTo++;
//     }
//   }

//   while (iFrom < nFrom) {
//     pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
//     code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
//     if (code) goto _err;

//     iFrom++;
//   }

// #if 0
//   // do noting
//   while (iTo < nTo) {
//     pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo);
//     code = tsdbApplyDFileSetChange(pFS, NULL, pDFileSetTo);
//     if (code) goto _err;

//     iTo++;
//   }
// #endif

//   return code;

// _err:
S
Shengliang Guan 已提交
247
//   tsdbError("vgId:%d, tsdb fs apply disk change failed sicne %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
248 249 250 251 252
//   return code;
// }

void tsdbFSDestroy(STsdbFS *pFS) {
  if (pFS->pDelFile) {
253
    taosMemoryFree(pFS->pDelFile);
H
Hongze Cheng 已提交
254
  }
H
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256 257 258 259 260
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
    taosMemoryFree(pSet->pHeadF);
    taosMemoryFree(pSet->pDataF);
    taosMemoryFree(pSet->pSmaF);
H
Hongze Cheng 已提交
261 262
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      taosMemoryFree(pSet->aSttF[iStt]);
H
Hongze Cheng 已提交
263
    }
H
Hongze Cheng 已提交
264
  }
H
Hongze Cheng 已提交
265

266
  taosArrayDestroy(pFS->aDFileSet);
H
Hongze Cheng 已提交
267 268 269 270 271 272 273 274 275 276 277
}

static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
  int32_t code = 0;
  int64_t size;
  char    fname[TSDB_FILENAME_LEN];

  // SDelFile
  if (pTsdb->fs.pDelFile) {
    tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname);
    if (taosStatFile(fname, &size, NULL)) {
H
Hongze Cheng 已提交
278 279 280 281
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

282
    if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
283
      code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
284 285
      goto _err;
    }
H
Hongze Cheng 已提交
286
  }
H
Hongze Cheng 已提交
287

H
Hongze Cheng 已提交
288 289 290 291 292 293 294 295 296 297
  // SArray<SDFileSet>
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);

    // head =========
    tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
    if (taosStatFile(fname, &size, NULL)) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
H
Hongze Cheng 已提交
298
    if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
299
      code = TSDB_CODE_FILE_CORRUPTED;
H
Hongze Cheng 已提交
300 301 302
      goto _err;
    }

H
Hongze Cheng 已提交
303 304 305
    // data =========
    tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
    if (taosStatFile(fname, &size, NULL)) {
H
Hongze Cheng 已提交
306 307 308
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
H
Hongze Cheng 已提交
309
    if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
310 311
      code = TSDB_CODE_FILE_CORRUPTED;
      goto _err;
H
Hongze Cheng 已提交
312
    } else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
313 314 315
      code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317 318 319 320 321 322
    // sma =============
    tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
    if (taosStatFile(fname, &size, NULL)) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }
H
Hongze Cheng 已提交
323
    if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
324 325
      code = TSDB_CODE_FILE_CORRUPTED;
      goto _err;
H
Hongze Cheng 已提交
326
    } else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
327 328 329
      code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
330

H
Hongze Cheng 已提交
331 332 333
    // stt ===========
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
H
Hongze Cheng 已提交
334 335 336 337
      if (taosStatFile(fname, &size, NULL)) {
        code = TAOS_SYSTEM_ERROR(errno);
        goto _err;
      }
H
Hongze Cheng 已提交
338
      if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) {
H
Hongze Cheng 已提交
339 340 341 342
        code = TSDB_CODE_FILE_CORRUPTED;
        goto _err;
      }
    }
H
Hongze Cheng 已提交
343
  }
H
Hongze Cheng 已提交
344

H
Hongze Cheng 已提交
345 346
  {
    // remove those invalid files (todo)
H
Hongze Cheng 已提交
347 348 349 350 351
  }

  return code;

_err:
S
Shengliang Guan 已提交
352
  tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
353 354 355
  return code;
}

H
Hongze Cheng 已提交
356 357 358 359 360 361 362 363 364
int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
  if (((SDFileSet *)p1)->fid < ((SDFileSet *)p2)->fid) {
    return -1;
  } else if (((SDFileSet *)p1)->fid > ((SDFileSet *)p2)->fid) {
    return 1;
  }

  return 0;
}
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366 367 368 369
static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) {
  int32_t  code = 0;
  int8_t   hasDel;
  uint32_t nSet;
H
Hongze Cheng 已提交
370 371 372 373
  int32_t  n = 0;

  // version
  n += tGetI8(pData + n, NULL);
H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393

  // SDelFile
  n += tGetI8(pData + n, &hasDel);
  if (hasDel) {
    pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
    if (pTsdb->fs.pDelFile == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }

    pTsdb->fs.pDelFile->nRef = 1;
    n += tGetDelFile(pData + n, pTsdb->fs.pDelFile);
  } else {
    pTsdb->fs.pDelFile = NULL;
  }

  // SArray<SDFileSet>
  taosArrayClear(pTsdb->fs.aDFileSet);
  n += tGetU32v(pData + n, &nSet);
  for (uint32_t iSet = 0; iSet < nSet; iSet++) {
H
Hongze Cheng 已提交
394
    SDFileSet fSet = {0};
H
Hongze Cheng 已提交
395

H
Hongze Cheng 已提交
396 397
    int32_t nt = tGetDFileSet(pData + n, &fSet);
    if (nt < 0) {
H
Hongze Cheng 已提交
398 399
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
H
Hongze Cheng 已提交
400 401
    }

H
Hongze Cheng 已提交
402
    n += nt;
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404 405 406 407
    if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
408 409
  }

H
Hongze Cheng 已提交
410
  ASSERT(n + sizeof(TSCKSUM) == nData);
H
Hongze Cheng 已提交
411 412 413
  return code;

_err:
H
Hongze Cheng 已提交
414 415 416
  return code;
}

H
Hongze Cheng 已提交
417 418
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb) {
H
Hongze Cheng 已提交
419
  int32_t code = 0;
H
Hongze Cheng 已提交
420
  SVnode *pVnode = pTsdb->pVnode;
H
Hongze Cheng 已提交
421

H
Hongze Cheng 已提交
422 423 424 425 426 427 428 429 430 431 432
  // open handle
  pTsdb->fs.pDelFile = NULL;
  pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
  if (pTsdb->fs.aDFileSet == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  // load fs or keep empty
  char fname[TSDB_FILENAME_LEN];

H
Hongze Cheng 已提交
433 434 435 436 437 438
  if (pVnode->pTfs) {
    snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
             pTsdb->path, TD_DIRSEP);
  } else {
    snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP);
  }
H
Hongze Cheng 已提交
439 440 441 442 443 444 445 446 447 448 449

  if (!taosCheckExistFile(fname)) {
    // empty one
    code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, fname);
    if (code) goto _err;
  } else {
    // read
    TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
    if (pFD == NULL) {
      code = TAOS_SYSTEM_ERROR(errno);
      goto _err;
H
Hongze Cheng 已提交
450
    }
H
Hongze Cheng 已提交
451 452 453

    int64_t size;
    if (taosFStatFile(pFD, &size, NULL) < 0) {
H
Hongze Cheng 已提交
454
      code = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
455
      taosCloseFile(&pFD);
H
Hongze Cheng 已提交
456 457
      goto _err;
    }
H
Hongze Cheng 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490

    uint8_t *pData = taosMemoryMalloc(size);
    if (pData == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      taosCloseFile(&pFD);
      goto _err;
    }

    int64_t n = taosReadFile(pFD, pData, size);
    if (n < 0) {
      code = TAOS_SYSTEM_ERROR(errno);
      taosMemoryFree(pData);
      taosCloseFile(&pFD);
      goto _err;
    }

    if (!taosCheckChecksumWhole(pData, size)) {
      code = TSDB_CODE_FILE_CORRUPTED;
      taosMemoryFree(pData);
      taosCloseFile(&pFD);
      goto _err;
    }

    taosCloseFile(&pFD);

    // recover fs
    code = tsdbRecoverFS(pTsdb, pData, size);
    if (code) {
      taosMemoryFree(pData);
      goto _err;
    }

    taosMemoryFree(pData);
H
Hongze Cheng 已提交
491 492
  }

H
Hongze Cheng 已提交
493 494 495 496
  // scan and fix FS
  code = tsdbScanAndTryFixFS(pTsdb);
  if (code) goto _err;

H
Hongze Cheng 已提交
497 498 499
  return code;

_err:
S
Shengliang Guan 已提交
500
  tsdbError("vgId:%d, tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
501 502 503
  return code;
}

H
Hongze Cheng 已提交
504 505
int32_t tsdbFSClose(STsdb *pTsdb) {
  int32_t code = 0;
H
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507 508 509 510
  if (pTsdb->fs.pDelFile) {
    ASSERT(pTsdb->fs.pDelFile->nRef == 1);
    taosMemoryFree(pTsdb->fs.pDelFile);
  }
H
Hongze Cheng 已提交
511

H
Hongze Cheng 已提交
512 513
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515 516 517
    // head
    ASSERT(pSet->pHeadF->nRef == 1);
    taosMemoryFree(pSet->pHeadF);
H
Hongze Cheng 已提交
518

H
Hongze Cheng 已提交
519 520 521
    // data
    ASSERT(pSet->pDataF->nRef == 1);
    taosMemoryFree(pSet->pDataF);
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523 524 525
    // sma
    ASSERT(pSet->pSmaF->nRef == 1);
    taosMemoryFree(pSet->pSmaF);
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527 528 529 530
    // stt
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      ASSERT(pSet->aSttF[iStt]->nRef == 1);
      taosMemoryFree(pSet->aSttF[iStt]);
H
Hongze Cheng 已提交
531
    }
H
Hongze Cheng 已提交
532 533
  }

H
Hongze Cheng 已提交
534
  taosArrayDestroy(pTsdb->fs.aDFileSet);
H
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536 537 538 539 540 541 542 543 544 545 546
  return code;
}

int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
  int32_t code = 0;

  pFS->pDelFile = NULL;
  pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
  if (pFS->aDFileSet == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
547 548
  }

H
Hongze Cheng 已提交
549 550 551 552 553 554
  if (pTsdb->fs.pDelFile) {
    pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
    if (pFS->pDelFile == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
H
Hongze Cheng 已提交
555

H
Hongze Cheng 已提交
556
    *pFS->pDelFile = *pTsdb->fs.pDelFile;
H
Hongze Cheng 已提交
557
  }
H
Hongze Cheng 已提交
558 559 560

  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
H
Hongze Cheng 已提交
561
    SDFileSet  fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
H
Hongze Cheng 已提交
562 563 564 565 566 567 568

    // head
    fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
    if (fSet.pHeadF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
H
Hongze Cheng 已提交
569
    *fSet.pHeadF = *pSet->pHeadF;
H
Hongze Cheng 已提交
570 571 572 573 574 575 576

    // data
    fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
    if (fSet.pDataF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
H
Hongze Cheng 已提交
577
    *fSet.pDataF = *pSet->pDataF;
H
Hongze Cheng 已提交
578

H
Hongze Cheng 已提交
579
    // sma
H
Hongze Cheng 已提交
580 581 582 583 584
    fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
    if (fSet.pSmaF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
H
Hongze Cheng 已提交
585
    *fSet.pSmaF = *pSet->pSmaF;
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587 588 589 590
    // stt
    for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) {
      fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
      if (fSet.aSttF[fSet.nSttF] == NULL) {
H
Hongze Cheng 已提交
591 592 593
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _exit;
      }
H
Hongze Cheng 已提交
594
      *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
H
Hongze Cheng 已提交
595
    }
H
Hongze Cheng 已提交
596 597 598 599 600

    if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }
H
Hongze Cheng 已提交
601 602 603 604 605 606 607 608 609 610
  }

_exit:
  return code;
}

int32_t tsdbFSRollback(STsdbFS *pFS) {
  int32_t code = 0;

  ASSERT(0);
H
Hongze Cheng 已提交
611 612 613 614

  return code;
}

H
Hongze Cheng 已提交
615 616
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) {
  int32_t code = 0;
H
Hongze Cheng 已提交
617

H
Hongze Cheng 已提交
618 619 620 621 622
  if (pFS->pDelFile == NULL) {
    pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
    if (pFS->pDelFile == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
H
Hongze Cheng 已提交
623 624
    }
  }
H
Hongze Cheng 已提交
625 626 627 628
  *pFS->pDelFile = *pDelFile;

_exit:
  return code;
H
Hongze Cheng 已提交
629 630
}

H
Hongze Cheng 已提交
631 632 633 634 635 636 637 638 639 640 641 642 643
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
  int32_t code = 0;
  int32_t idx = taosArraySearchIdx(pFS->aDFileSet, pSet, tDFileSetCmprFn, TD_GE);

  if (idx < 0) {
    idx = taosArrayGetSize(pFS->aDFileSet);
  } else {
    SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, idx);
    int32_t    c = tDFileSetCmprFn(pSet, pDFileSet);
    if (c == 0) {
      *pDFileSet->pHeadF = *pSet->pHeadF;
      *pDFileSet->pDataF = *pSet->pDataF;
      *pDFileSet->pSmaF = *pSet->pSmaF;
H
Hongze Cheng 已提交
644 645 646
      // stt
      if (pSet->nSttF > pDFileSet->nSttF) {
        ASSERT(pSet->nSttF == pDFileSet->nSttF + 1);
H
Hongze Cheng 已提交
647

H
Hongze Cheng 已提交
648 649
        pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
        if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) {
H
Hongze Cheng 已提交
650 651 652
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _exit;
        }
H
Hongze Cheng 已提交
653 654 655 656 657 658
        *pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1];
        pDFileSet->nSttF++;
      } else if (pSet->nSttF < pDFileSet->nSttF) {
        ASSERT(pSet->nSttF == 1);
        for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) {
          taosMemoryFree(pDFileSet->aSttF[iStt]);
H
Hongze Cheng 已提交
659 660
        }

H
Hongze Cheng 已提交
661 662
        *pDFileSet->aSttF[0] = *pSet->aSttF[0];
        pDFileSet->nSttF = 1;
H
Hongze Cheng 已提交
663
      } else {
H
Hongze Cheng 已提交
664 665
        for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
          *pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt];
H
Hongze Cheng 已提交
666
        }
H
Hongze Cheng 已提交
667
      }
H
Hongze Cheng 已提交
668

H
Hongze Cheng 已提交
669 670 671 672
      goto _exit;
    }
  }

H
Hongze Cheng 已提交
673 674
  ASSERT(pSet->nSttF == 1);
  SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1};
H
Hongze Cheng 已提交
675 676 677 678

  // head
  fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
  if (fSet.pHeadF == NULL) {
H
Hongze Cheng 已提交
679
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
680
    goto _exit;
H
Hongze Cheng 已提交
681
  }
H
Hongze Cheng 已提交
682
  *fSet.pHeadF = *pSet->pHeadF;
H
Hongze Cheng 已提交
683

H
Hongze Cheng 已提交
684 685 686
  // data
  fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
  if (fSet.pDataF == NULL) {
H
Hongze Cheng 已提交
687
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
688
    goto _exit;
H
Hongze Cheng 已提交
689
  }
H
Hongze Cheng 已提交
690 691
  *fSet.pDataF = *pSet->pDataF;

H
Hongze Cheng 已提交
692
  // sma
H
Hongze Cheng 已提交
693 694
  fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
  if (fSet.pSmaF == NULL) {
H
Hongze Cheng 已提交
695
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
696
    goto _exit;
H
Hongze Cheng 已提交
697
  }
H
Hongze Cheng 已提交
698 699
  *fSet.pSmaF = *pSet->pSmaF;

H
Hongze Cheng 已提交
700 701 702
  // stt
  fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
  if (fSet.aSttF[0] == NULL) {
H
Hongze Cheng 已提交
703 704 705
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
706
  *fSet.aSttF[0] = *pSet->aSttF[0];
H
Hongze Cheng 已提交
707

H
Hongze Cheng 已提交
708
  if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
H
Hongze Cheng 已提交
709
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
710 711 712 713 714 715
    goto _exit;
  }

_exit:
  return code;
}
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821

int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
  int32_t code = 0;
  char    tfname[TSDB_FILENAME_LEN];
  char    fname[TSDB_FILENAME_LEN];

  snprintf(tfname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT.t", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
           pTsdb->path, TD_DIRSEP);
  snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP,
           pTsdb->path, TD_DIRSEP);

  // gnrt CURRENT.t
  code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname);
  if (code) goto _err;

  // rename
  code = taosRenameFile(tfname, fname);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb fs commit phase 1 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
  int32_t code = 0;
  int32_t nRef;
  char    fname[TSDB_FILENAME_LEN];

  // del
  if (pFSNew->pDelFile) {
    SDelFile *pDelFile = pTsdb->fs.pDelFile;

    if (pDelFile == NULL || (pDelFile->commitID != pFSNew->pDelFile->commitID)) {
      pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
      if (pTsdb->fs.pDelFile == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }

      *pTsdb->fs.pDelFile = *pFSNew->pDelFile;
      pTsdb->fs.pDelFile->nRef = 1;

      if (pDelFile) {
        nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
        if (nRef == 0) {
          tsdbDelFileName(pTsdb, pDelFile, fname);
          taosRemoveFile(fname);
          taosMemoryFree(pDelFile);
        }
      }
    }
  } else {
    ASSERT(pTsdb->fs.pDelFile == NULL);
  }

  // data
  int32_t iOld = 0;
  int32_t iNew = 0;
  while (true) {
    int32_t   nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
    int32_t   nNew = taosArrayGetSize(pFSNew->aDFileSet);
    SDFileSet fSet;
    int8_t    sameDisk;

    if (iOld >= nOld && iNew >= nNew) break;

    SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
    SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFSNew->aDFileSet, iNew) : NULL;

    if (pSetOld && pSetNew) {
      if (pSetOld->fid == pSetNew->fid) {
        goto _merge_old_and_new;
      } else if (pSetOld->fid < pSetNew->fid) {
        goto _remove_old;
      } else {
        goto _add_new;
      }
    } else if (pSetOld) {
      goto _remove_old;
    } else {
      goto _add_new;
    }

  _merge_old_and_new:
    sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));

    // head
    fSet.pHeadF = pSetOld->pHeadF;
    if ((!sameDisk) || (pSetOld->pHeadF->commitID != pSetNew->pHeadF->commitID)) {
      pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
      if (pSetOld->pHeadF == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
      *pSetOld->pHeadF = *pSetNew->pHeadF;
      pSetOld->pHeadF->nRef = 1;

      nRef = atomic_sub_fetch_32(&fSet.pHeadF->nRef, 1);
      if (nRef == 0) {
        tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pHeadF, fname);
H
Hongze Cheng 已提交
822
        (void)taosRemoveFile(fname);
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
        taosMemoryFree(fSet.pHeadF);
      }
    } else {
      ASSERT(fSet.pHeadF->size == pSetNew->pHeadF->size);
      ASSERT(fSet.pHeadF->offset == pSetNew->pHeadF->offset);
    }

    // data
    fSet.pDataF = pSetOld->pDataF;
    if ((!sameDisk) || (pSetOld->pDataF->commitID != pSetNew->pDataF->commitID)) {
      pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
      if (pSetOld->pDataF == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
      *pSetOld->pDataF = *pSetNew->pDataF;
      pSetOld->pDataF->nRef = 1;

      nRef = atomic_sub_fetch_32(&fSet.pDataF->nRef, 1);
      if (nRef == 0) {
        tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pDataF, fname);
        taosRemoveFile(fname);
        taosMemoryFree(fSet.pDataF);
      }
    } else {
      ASSERT(pSetOld->pDataF->size <= pSetNew->pDataF->size);
      pSetOld->pDataF->size = pSetNew->pDataF->size;
    }

    // sma
    fSet.pSmaF = pSetOld->pSmaF;
    if ((!sameDisk) || (pSetOld->pSmaF->commitID != pSetNew->pSmaF->commitID)) {
      pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
      if (pSetOld->pSmaF == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
      *pSetOld->pSmaF = *pSetNew->pSmaF;
      pSetOld->pSmaF->nRef = 1;

      nRef = atomic_sub_fetch_32(&fSet.pSmaF->nRef, 1);
      if (nRef == 0) {
        tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pSmaF, fname);
H
Hongze Cheng 已提交
866
        (void)taosRemoveFile(fname);
867 868 869 870 871 872 873 874 875 876
        taosMemoryFree(fSet.pSmaF);
      }
    } else {
      ASSERT(pSetOld->pSmaF->size <= pSetNew->pSmaF->size);
      pSetOld->pSmaF->size = pSetNew->pSmaF->size;
    }

    // stt
    if (sameDisk) {
      if (pSetNew->nSttF > pSetOld->nSttF) {
H
Hongze Cheng 已提交
877
        ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
        pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
        if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
        *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
        pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
        pSetOld->nSttF++;
      } else if (pSetNew->nSttF < pSetOld->nSttF) {
        ASSERT(pSetNew->nSttF == 1);
        for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
          SSttFile *pSttFile = pSetOld->aSttF[iStt];
          nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
          if (nRef == 0) {
            tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
            taosRemoveFile(fname);
            taosMemoryFree(pSttFile);
          }
          pSetOld->aSttF[iStt] = NULL;
        }

        pSetOld->nSttF = 1;
        pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
        if (pSetOld->aSttF[0] == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
        *pSetOld->aSttF[0] = *pSetNew->aSttF[0];
        pSetOld->aSttF[0]->nRef = 1;
      } else {
        for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
          if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
            SSttFile *pSttFile = pSetOld->aSttF[iStt];
            nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
            if (nRef == 0) {
              tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
              taosRemoveFile(fname);
              taosMemoryFree(pSttFile);
            }

            pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
            if (pSetOld->aSttF[iStt] == NULL) {
              code = TSDB_CODE_OUT_OF_MEMORY;
              goto _err;
            }
            *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
            pSetOld->aSttF[iStt]->nRef = 1;
          } else {
926 927
            ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
            ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
          }
        }
      }
    } else {
      ASSERT(pSetOld->nSttF == pSetNew->nSttF);
      for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
        SSttFile *pSttFile = pSetOld->aSttF[iStt];
        nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
        if (nRef == 0) {
          tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
          taosRemoveFile(fname);
          taosMemoryFree(pSttFile);
        }

        pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
        if (pSetOld->aSttF[iStt] == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto _err;
        }
        *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
        pSetOld->aSttF[iStt]->nRef = 1;
      }
    }

    if (!sameDisk) {
      pSetOld->diskId = pSetNew->diskId;
    }

    iOld++;
    iNew++;
    continue;

  _remove_old:
    nRef = atomic_sub_fetch_32(&pSetOld->pHeadF->nRef, 1);
    if (nRef == 0) {
      tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pHeadF, fname);
H
Hongze Cheng 已提交
964
      (void)taosRemoveFile(fname);
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
      taosMemoryFree(pSetOld->pHeadF);
    }

    nRef = atomic_sub_fetch_32(&pSetOld->pDataF->nRef, 1);
    if (nRef == 0) {
      tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pDataF, fname);
      taosRemoveFile(fname);
      taosMemoryFree(pSetOld->pDataF);
    }

    nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
    if (nRef == 0) {
      tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pSmaF, fname);
      taosRemoveFile(fname);
      taosMemoryFree(pSetOld->pSmaF);
    }

    for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
      nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1);
      if (nRef == 0) {
        tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname);
        taosRemoveFile(fname);
        taosMemoryFree(pSetOld->aSttF[iStt]);
      }
    }

    taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
    continue;

  _add_new:
    fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1};

    // head
    fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
    if (fSet.pHeadF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    *fSet.pHeadF = *pSetNew->pHeadF;
    fSet.pHeadF->nRef = 1;

    // data
    fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
    if (fSet.pDataF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    *fSet.pDataF = *pSetNew->pDataF;
    fSet.pDataF->nRef = 1;

    // sma
    fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
    if (fSet.pSmaF == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    *fSet.pSmaF = *pSetNew->pSmaF;
    fSet.pSmaF->nRef = 1;

    // stt
    ASSERT(pSetNew->nSttF == 1);
    fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
    if (fSet.aSttF[0] == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    *fSet.aSttF[0] = *pSetNew->aSttF[0];
    fSet.aSttF[0]->nRef = 1;

    if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
    iOld++;
    iNew++;
    continue;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb fs commit phase 2 failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}

C
Cary Xu 已提交
1050
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
H
Hongze Cheng 已提交
1051
  int32_t code = 0;
H
Hongze Cheng 已提交
1052
  int32_t nRef;
H
Hongze Cheng 已提交
1053

H
Hongze Cheng 已提交
1054 1055 1056 1057
  pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
  if (pFS->aDFileSet == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
H
Hongze Cheng 已提交
1058 1059
  }

H
Hongze Cheng 已提交
1060 1061 1062 1063
  pFS->pDelFile = pTsdb->fs.pDelFile;
  if (pFS->pDelFile) {
    nRef = atomic_fetch_add_32(&pFS->pDelFile->nRef, 1);
    ASSERT(nRef > 0);
H
Hongze Cheng 已提交
1064 1065
  }

H
Hongze Cheng 已提交
1066 1067 1068 1069
  SDFileSet fSet;
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
    fSet = *pSet;
H
Hongze Cheng 已提交
1070

H
Hongze Cheng 已提交
1071 1072
    nRef = atomic_fetch_add_32(&pSet->pHeadF->nRef, 1);
    ASSERT(nRef > 0);
H
Hongze Cheng 已提交
1073

H
Hongze Cheng 已提交
1074 1075
    nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
    ASSERT(nRef > 0);
H
Hongze Cheng 已提交
1076

H
Hongze Cheng 已提交
1077 1078
    nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
    ASSERT(nRef > 0);
H
Hongze Cheng 已提交
1079

H
Hongze Cheng 已提交
1080 1081
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1);
H
Hongze Cheng 已提交
1082 1083 1084
      ASSERT(nRef > 0);
    }

H
Hongze Cheng 已提交
1085
    if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
H
Hongze Cheng 已提交
1086
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1087
      goto _exit;
H
Hongze Cheng 已提交
1088 1089 1090
    }
  }

H
Hongze Cheng 已提交
1091
_exit:
H
Hongze Cheng 已提交
1092 1093 1094
  return code;
}

H
Hongze Cheng 已提交
1095 1096 1097
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
  int32_t nRef;
  char    fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
1098

H
Hongze Cheng 已提交
1099 1100 1101 1102 1103
  if (pFS->pDelFile) {
    nRef = atomic_sub_fetch_32(&pFS->pDelFile->nRef, 1);
    ASSERT(nRef >= 0);
    if (nRef == 0) {
      tsdbDelFileName(pTsdb, pFS->pDelFile, fname);
H
Hongze Cheng 已提交
1104
      (void)taosRemoveFile(fname);
H
Hongze Cheng 已提交
1105 1106
      taosMemoryFree(pFS->pDelFile);
    }
H
Hongze Cheng 已提交
1107 1108
  }

H
Hongze Cheng 已提交
1109 1110
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
H
Hongze Cheng 已提交
1111

H
Hongze Cheng 已提交
1112 1113 1114 1115 1116
    // head
    nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
    ASSERT(nRef >= 0);
    if (nRef == 0) {
      tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
H
Hongze Cheng 已提交
1117
      (void)taosRemoveFile(fname);
H
Hongze Cheng 已提交
1118 1119
      taosMemoryFree(pSet->pHeadF);
    }
H
Hongze Cheng 已提交
1120

H
Hongze Cheng 已提交
1121 1122 1123 1124 1125 1126 1127 1128
    // data
    nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
    ASSERT(nRef >= 0);
    if (nRef == 0) {
      tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
      taosRemoveFile(fname);
      taosMemoryFree(pSet->pDataF);
    }
H
Hongze Cheng 已提交
1129

H
Hongze Cheng 已提交
1130 1131 1132 1133 1134 1135 1136
    // sma
    nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
    ASSERT(nRef >= 0);
    if (nRef == 0) {
      tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
      taosRemoveFile(fname);
      taosMemoryFree(pSet->pSmaF);
H
Hongze Cheng 已提交
1137
    }
H
Hongze Cheng 已提交
1138

H
Hongze Cheng 已提交
1139 1140 1141
    // stt
    for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
      nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
H
Hongze Cheng 已提交
1142 1143
      ASSERT(nRef >= 0);
      if (nRef == 0) {
H
Hongze Cheng 已提交
1144
        tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
H
Hongze Cheng 已提交
1145
        taosRemoveFile(fname);
H
Hongze Cheng 已提交
1146
        taosMemoryFree(pSet->aSttF[iStt]);
H
Hongze Cheng 已提交
1147 1148 1149
        /* code */
      }
    }
H
Hongze Cheng 已提交
1150 1151
  }

H
Hongze Cheng 已提交
1152 1153
  taosArrayDestroy(pFS->aDFileSet);
}