tsdbFS.c 28.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

C
Cary Xu 已提交
18 19
extern const char *TSDB_LEVEL_DNAME[];

H
Hongze Cheng 已提交
20 21
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"};
H
Hongze Cheng 已提交
22
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24 25
static int  tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
S
Shengliang Guan 已提交
26
static int  tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus);
H
Hongze Cheng 已提交
27
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
S
Shengliang Guan 已提交
28
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]);
H
Hongze Cheng 已提交
29 30 31 32
static int  tsdbOpenFSFromCurrent(STsdb *pRepo);
static int  tsdbScanAndTryFixFS(STsdb *pRepo);
static int  tsdbScanRootDir(STsdb *pRepo);
static int  tsdbScanDataDir(STsdb *pRepo);
S
Shengliang Guan 已提交
33
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf);
H
Hongze Cheng 已提交
34
static int  tsdbRestoreCurrent(STsdb *pRepo);
H
Hongze Cheng 已提交
35
static int  tsdbComparTFILE(const void *arg1, const void *arg2);
H
Hongze Cheng 已提交
36
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
H
Hongze Cheng 已提交
37 38 39
// static int  tsdbProcessExpiredFS(STsdb *pRepo);
// static int  tsdbCreateMeta(STsdb *pRepo);

40 41
static void tsdbGetRootDir(int repoid, const char* dir, char dirName[]) {
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, dir);
H
Hongze Cheng 已提交
42 43
}

44 45
static void tsdbGetDataDir(int repoid,  const char* dir, char dirName[]) {
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, dir);
H
Hongze Cheng 已提交
46
}
H
Hongze Cheng 已提交
47

48
// For backward compatibility
H
Hongze Cheng 已提交
49 50 51 52 53 54 55 56 57 58
// ================== CURRENT file header info
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
  int tlen = 0;

  tlen += taosEncodeFixedU32(buf, pHeader->version);
  tlen += taosEncodeFixedU32(buf, pHeader->len);

  return tlen;
}

H
Hongze Cheng 已提交
59
static void *tsdbDecodeFSHeader(void *buf, SFSHeader *pHeader) {
H
Hongze Cheng 已提交
60 61
  buf = taosDecodeFixedU32(buf, &(pHeader->version));
  buf = taosDecodeFixedU32(buf, &(pHeader->len));
H
Hongze Cheng 已提交
62 63 64 65 66 67 68 69

  return buf;
}

// ================== STsdbFSMeta
static int tsdbEncodeFSMeta(void **buf, STsdbFSMeta *pMeta) {
  int tlen = 0;

H
Hongze Cheng 已提交
70
  tlen += taosEncodeFixedU32(buf, pMeta->version);
H
Hongze Cheng 已提交
71 72 73 74 75 76
  tlen += taosEncodeFixedI64(buf, pMeta->totalPoints);
  tlen += taosEncodeFixedI64(buf, pMeta->totalStorage);

  return tlen;
}

H
Hongze Cheng 已提交
77
static void *tsdbDecodeFSMeta(void *buf, STsdbFSMeta *pMeta) {
H
Hongze Cheng 已提交
78
  buf = taosDecodeFixedU32(buf, &(pMeta->version));
H
Hongze Cheng 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  buf = taosDecodeFixedI64(buf, &(pMeta->totalPoints));
  buf = taosDecodeFixedI64(buf, &(pMeta->totalStorage));

  return buf;
}

// ================== SFSStatus
static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
  int      tlen = 0;
  uint64_t nset = taosArrayGetSize(pArray);

  tlen += taosEncodeFixedU64(buf, nset);
  for (size_t i = 0; i < nset; i++) {
    SDFileSet *pSet = taosArrayGet(pArray, i);

    tlen += tsdbEncodeDFileSet(buf, pSet);
  }

  return tlen;
}

H
Hongze Cheng 已提交
100 101
static void *tsdbDecodeDFileSetArray(STsdb *pRepo, void *buf, SArray *pArray) {
  uint64_t nset = 0;
H
Hongze Cheng 已提交
102 103 104 105 106

  taosArrayClear(pArray);

  buf = taosDecodeFixedU64(buf, &nset);
  for (size_t i = 0; i < nset; i++) {
C
update  
Cary Xu 已提交
107
    SDFileSet dset = {0};
S
Shengliang Guan 已提交
108
    buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
H
Hongze Cheng 已提交
109 110 111 112 113 114
    taosArrayPush(pArray, (void *)(&dset));
  }
  return buf;
}

static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
H
Hongze Cheng 已提交
115
  // ASSERT(pStatus->pmf);
H
Hongze Cheng 已提交
116

H
Hongze Cheng 已提交
117 118
  int tlen = 0;

H
Hongze Cheng 已提交
119
  // tlen += tsdbEncodeSMFile(buf, pStatus->pmf);
H
Hongze Cheng 已提交
120 121 122 123 124
  tlen += tsdbEncodeDFileSetArray(buf, pStatus->df);

  return tlen;
}

H
Hongze Cheng 已提交
125
static void *tsdbDecodeFSStatus(STsdb *pRepo, void *buf, SFSStatus *pStatus) {
H
Hongze Cheng 已提交
126 127
  tsdbResetFSStatus(pStatus);

H
Hongze Cheng 已提交
128
  // pStatus->pmf = &(pStatus->mf);
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130
  // buf = tsdbDecodeSMFile(buf, pStatus->pmf);
S
Shengliang Guan 已提交
131
  buf = tsdbDecodeDFileSetArray(pRepo, buf, pStatus->df);
H
Hongze Cheng 已提交
132 133 134 135 136

  return buf;
}

static SFSStatus *tsdbNewFSStatus(int maxFSet) {
wafwerar's avatar
wafwerar 已提交
137
  SFSStatus *pStatus = (SFSStatus *)taosMemoryCalloc(1, sizeof(*pStatus));
H
Hongze Cheng 已提交
138 139 140 141 142
  if (pStatus == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

H
Hongze Cheng 已提交
143
  // TSDB_FILE_SET_CLOSED(&(pStatus->mf));
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145
  pStatus->df = taosArrayInit(maxFSet, sizeof(SDFileSet));
H
Hongze Cheng 已提交
146
  if (pStatus->df == NULL) {
H
Hongze Cheng 已提交
147
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
wafwerar's avatar
wafwerar 已提交
148
    taosMemoryFree(pStatus);
H
Hongze Cheng 已提交
149 150 151 152 153 154 155 156 157
    return NULL;
  }

  return pStatus;
}

static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) {
  if (pStatus) {
    pStatus->df = taosArrayDestroy(pStatus->df);
wafwerar's avatar
wafwerar 已提交
158
    taosMemoryFree(pStatus);
H
Hongze Cheng 已提交
159 160 161 162 163 164 165 166 167 168
  }

  return NULL;
}

static void tsdbResetFSStatus(SFSStatus *pStatus) {
  if (pStatus == NULL) {
    return;
  }

H
Hongze Cheng 已提交
169
  // TSDB_FILE_SET_CLOSED(&(pStatus->mf));
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  // pStatus->pmf = NULL;
H
Hongze Cheng 已提交
172 173 174
  taosArrayClear(pStatus->df);
}

H
Hongze Cheng 已提交
175 176
// static void tsdbSetStatusMFile(SFSStatus *pStatus, const SMFile *pMFile) {
//   ASSERT(pStatus->pmf == NULL);
H
Hongze Cheng 已提交
177

H
Hongze Cheng 已提交
178 179 180
//   pStatus->pmf = &(pStatus->mf);
//   tsdbInitMFileEx(pStatus->pmf, (SMFile *)pMFile);
// }
H
Hongze Cheng 已提交
181 182

static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
H
Hongze Cheng 已提交
183
  if (taosArrayPush(pStatus->df, (void *)pSet) == NULL) {
H
Hongze Cheng 已提交
184 185 186 187
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
refact  
Hongze Cheng 已提交
188 189
  TSDB_FSET_SET_CLOSED(((SDFileSet *)taosArrayGetLast(pStatus->df)));

H
Hongze Cheng 已提交
190 191 192
  return 0;
}

H
Hongze Cheng 已提交
193
// ================== STsdbFS
C
Cary Xu 已提交
194
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
H
Hongze Cheng 已提交
195
  int      keep = pCfg->keep2;
H
refact  
Hongze Cheng 已提交
196
  int      days = pCfg->days;
H
Hongze Cheng 已提交
197 198 199
  int      maxFSet = TSDB_MAX_FSETS(keep, days);
  STsdbFS *pfs;

wafwerar's avatar
wafwerar 已提交
200
  pfs = (STsdbFS *)taosMemoryCalloc(1, sizeof(*pfs));
H
Hongze Cheng 已提交
201
  if (pfs == NULL) {
H
Hongze Cheng 已提交
202 203 204 205
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
206
  int code = taosThreadRwlockInit(&(pfs->lock), NULL);
H
Hongze Cheng 已提交
207 208
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
wafwerar's avatar
wafwerar 已提交
209
    taosMemoryFree(pfs);
H
Hongze Cheng 已提交
210 211 212
    return NULL;
  }

H
Hongze Cheng 已提交
213 214 215
  pfs->cstatus = tsdbNewFSStatus(maxFSet);
  if (pfs->cstatus == NULL) {
    tsdbFreeFS(pfs);
H
Hongze Cheng 已提交
216 217 218
    return NULL;
  }

H
Hongze Cheng 已提交
219 220
  pfs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  if (pfs->metaCache == NULL) {
H
Hongze Cheng 已提交
221
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
222
    tsdbFreeFS(pfs);
H
Hongze Cheng 已提交
223 224 225
    return NULL;
  }

226
  pfs->intxn = false;
227
  pfs->metaCacheComp = NULL;
228

H
Hongze Cheng 已提交
229 230 231
  pfs->nstatus = tsdbNewFSStatus(maxFSet);
  if (pfs->nstatus == NULL) {
    tsdbFreeFS(pfs);
H
Hongze Cheng 已提交
232 233 234
    return NULL;
  }

H
Hongze Cheng 已提交
235
  return pfs;
H
Hongze Cheng 已提交
236 237
}

H
Hongze Cheng 已提交
238 239 240 241 242 243
void *tsdbFreeFS(STsdbFS *pfs) {
  if (pfs) {
    pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
    taosHashCleanup(pfs->metaCache);
    pfs->metaCache = NULL;
    pfs->cstatus = tsdbFreeFSStatus(pfs->cstatus);
wafwerar's avatar
wafwerar 已提交
244
    taosThreadRwlockDestroy(&(pfs->lock));
wafwerar's avatar
wafwerar 已提交
245
    taosMemoryFree(pfs);
H
Hongze Cheng 已提交
246
  }
H
Hongze Cheng 已提交
247

H
Hongze Cheng 已提交
248 249 250
  return NULL;
}

H
Hongze Cheng 已提交
251
int tsdbOpenFS(STsdb *pRepo) {
H
refact  
Hongze Cheng 已提交
252 253
  STsdbFS *pfs = REPO_FS(pRepo);
  char     current[TSDB_FILENAME_LEN] = "\0";
254
  int      nExpired = 0;
H
Hongze Cheng 已提交
255 256 257

  ASSERT(pfs != NULL);

S
Shengliang Guan 已提交
258
  tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
H
Hongze Cheng 已提交
259

260
  tsdbGetRtnSnap(pRepo, &pRepo->rtn);
261
  if (taosCheckExistFile(current)) {
H
Hongze Cheng 已提交
262 263 264 265
    if (tsdbOpenFSFromCurrent(pRepo) < 0) {
      tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno));
      return -1;
    }
H
Hongze Cheng 已提交
266

267
    tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired);
H
Hongze Cheng 已提交
268 269 270
    // if (nExpired > 0) {
    //   tsdbProcessExpiredFS(pRepo);
    // }
H
refact  
Hongze Cheng 已提交
271
  } else {
272
    // should skip expired fileset inside of the function
H
refact  
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281
    if (tsdbRestoreCurrent(pRepo) < 0) {
      tsdbError("vgId:%d failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno));
      return -1;
    }
  }

  if (tsdbScanAndTryFixFS(pRepo) < 0) {
    tsdbError("vgId:%d failed to scan and fix FS since %s", REPO_ID(pRepo), tstrerror(terrno));
    return -1;
H
Hongze Cheng 已提交
282
  }
H
Hongze Cheng 已提交
283

H
Hongze Cheng 已提交
284 285 286 287 288
  // // Load meta cache if has meta file
  // if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) {
  //   tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno));
  //   return -1;
  // }
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291 292
  return 0;
}

H
Hongze Cheng 已提交
293
void tsdbCloseFS(STsdb *pRepo) {
H
refact  
Hongze Cheng 已提交
294
  // Do nothing
H
Hongze Cheng 已提交
295 296
}

H
Hongze Cheng 已提交
297
// Start a new transaction to modify the file system
H
Hongze Cheng 已提交
298
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) {
H
Hongze Cheng 已提交
299
  STsdbFS *pfs = REPO_FS(pRepo);
H
Hongze Cheng 已提交
300
  ASSERT(pfs->intxn == false);
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302 303
  pfs->intxn = true;
  tsdbResetFSStatus(pfs->nstatus);
H
Hongze Cheng 已提交
304
  pfs->nstatus->meta = pfs->cstatus->meta;
H
Hongze Cheng 已提交
305
  // if (pfs->cstatus->pmf == NULL) {
H
Hongze Cheng 已提交
306
  pfs->nstatus->meta.version += 1;
H
Hongze Cheng 已提交
307 308 309
  // } else {
  //   pfs->nstatus->meta.version = pfs->cstatus->meta.version + 1;
  // }
H
Hongze Cheng 已提交
310
  pfs->nstatus->meta.totalPoints = pfs->cstatus->meta.totalPoints + pointsAdd;
H
Hongze Cheng 已提交
311
  pfs->nstatus->meta.totalStorage = pfs->cstatus->meta.totalStorage += storageAdd;
H
Hongze Cheng 已提交
312
}
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314 315
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; }

H
Hongze Cheng 已提交
316
int tsdbEndFSTxn(STsdb *pRepo) {
H
Hongze Cheng 已提交
317
  STsdbFS *pfs = REPO_FS(pRepo);
H
Hongze Cheng 已提交
318 319
  ASSERT(FS_IN_TXN(pfs));
  SFSStatus *pStatus;
H
Hongze Cheng 已提交
320

H
Hongze Cheng 已提交
321
  // Write current file system snapshot
S
Shengliang Guan 已提交
322
  if (tsdbSaveFSStatus(pRepo, pfs->nstatus) < 0) {
H
Hongze Cheng 已提交
323
    tsdbEndFSTxnWithError(pfs);
H
Hongze Cheng 已提交
324 325
    return -1;
  }
H
Hongze Cheng 已提交
326

H
Hongze Cheng 已提交
327
  // Make new
H
Hongze Cheng 已提交
328 329 330 331 332
  tsdbWLockFS(pfs);
  pStatus = pfs->cstatus;
  pfs->cstatus = pfs->nstatus;
  pfs->nstatus = pStatus;
  tsdbUnLockFS(pfs);
H
Hongze Cheng 已提交
333

H
Hongze Cheng 已提交
334
  // Apply actual change to each file and SDFileSet
H
Hongze Cheng 已提交
335
  tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
H
Hongze Cheng 已提交
336

H
Hongze Cheng 已提交
337
  pfs->intxn = false;
H
Hongze Cheng 已提交
338 339 340
  return 0;
}

H
Hongze Cheng 已提交
341
int tsdbEndFSTxnWithError(STsdbFS *pfs) {
H
Hongze Cheng 已提交
342 343
  tsdbApplyFSTxnOnDisk(pfs->nstatus, pfs->cstatus);
  // TODO: if mf change, reload pfs->metaCache
H
Hongze Cheng 已提交
344 345
  pfs->intxn = false;
  return 0;
H
Hongze Cheng 已提交
346 347
}

H
Hongze Cheng 已提交
348
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile) { tsdbSetStatusMFile(pfs->nstatus, pMFile); }
H
Hongze Cheng 已提交
349

H
Hongze Cheng 已提交
350
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
H
Hongze Cheng 已提交
351

S
Shengliang Guan 已提交
352
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
H
Hongze Cheng 已提交
353
  SFSHeader fsheader;
H
Hongze Cheng 已提交
354 355
  void     *pBuf = NULL;
  void     *ptr;
H
Hongze Cheng 已提交
356
  char      hbuf[TSDB_FILE_HEAD_SIZE] = "\0";
H
Hongze Cheng 已提交
357 358
  char      tfname[TSDB_FILENAME_LEN] = "\0";
  char      cfname[TSDB_FILENAME_LEN] = "\0";
H
Hongze Cheng 已提交
359

S
Shengliang Guan 已提交
360 361
  tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
  tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
H
Hongze Cheng 已提交
362

363
  TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
364
  if (pFile == NULL) {
H
Hongze Cheng 已提交
365
    terrno = TAOS_SYSTEM_ERROR(errno);
H
Hongze Cheng 已提交
366 367 368
    return -1;
  }

369
  fsheader.version = TSDB_LATEST_SFS_VER;
H
Hongze Cheng 已提交
370
  if (taosArrayGetSize(pStatus->df) == 0) {
H
Hongze Cheng 已提交
371 372
    fsheader.len = 0;
  } else {
H
Hongze Cheng 已提交
373
    fsheader.len = tsdbEncodeFSStatus(NULL, pStatus) + sizeof(TSCKSUM);
H
Hongze Cheng 已提交
374 375
  }

H
Hongze Cheng 已提交
376 377 378
  // Encode header part and write
  ptr = hbuf;
  tsdbEncodeFSHeader(&ptr, &fsheader);
H
Hongze Cheng 已提交
379
  tsdbEncodeFSMeta(&ptr, &(pStatus->meta));
H
Hongze Cheng 已提交
380

H
Hongze Cheng 已提交
381
  taosCalcChecksumAppend(0, (uint8_t *)hbuf, TSDB_FILE_HEAD_SIZE);
H
Hongze Cheng 已提交
382

383
  if (taosWriteFile(pFile, hbuf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
H
Hongze Cheng 已提交
384
    terrno = TAOS_SYSTEM_ERROR(errno);
385
    taosCloseFile(&pFile);
386
    taosRemoveFile(tfname);
H
Hongze Cheng 已提交
387 388 389
    return -1;
  }

H
Hongze Cheng 已提交
390 391 392
  // Encode file status and write to file
  if (fsheader.len > 0) {
    if (tsdbMakeRoom(&(pBuf), fsheader.len) < 0) {
393
      taosCloseFile(&pFile);
394
      taosRemoveFile(tfname);
H
Hongze Cheng 已提交
395
      return -1;
H
Hongze Cheng 已提交
396 397
    }

H
Hongze Cheng 已提交
398
    ptr = pBuf;
H
Hongze Cheng 已提交
399
    tsdbEncodeFSStatus(&ptr, pStatus);
H
Hongze Cheng 已提交
400
    taosCalcChecksumAppend(0, (uint8_t *)pBuf, fsheader.len);
H
Hongze Cheng 已提交
401

402
    if (taosWriteFile(pFile, pBuf, fsheader.len) < fsheader.len) {
H
Hongze Cheng 已提交
403
      terrno = TAOS_SYSTEM_ERROR(errno);
404
      taosCloseFile(&pFile);
405
      (void)taosRemoveFile(tfname);
H
Hongze Cheng 已提交
406
      taosTZfree(pBuf);
H
Hongze Cheng 已提交
407 408 409 410
      return -1;
    }
  }

H
Hongze Cheng 已提交
411
  // fsync, close and rename
412
  if (taosFsyncFile(pFile) < 0) {
H
Hongze Cheng 已提交
413
    terrno = TAOS_SYSTEM_ERROR(errno);
414
    taosCloseFile(&pFile);
415
    taosRemoveFile(tfname);
H
Hongze Cheng 已提交
416 417
    taosTZfree(pBuf);
    return -1;
H
Hongze Cheng 已提交
418
  }
H
Hongze Cheng 已提交
419

420
  (void)taosCloseFile(&pFile);
H
Hongze Cheng 已提交
421
  (void)taosRenameFile(tfname, cfname);
H
Hongze Cheng 已提交
422
  taosTZfree(pBuf);
H
Hongze Cheng 已提交
423

H
Hongze Cheng 已提交
424 425
  return 0;
}
H
Hongze Cheng 已提交
426

H
Hongze Cheng 已提交
427
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo) {
H
Hongze Cheng 已提交
428 429 430 431 432
  int        ifrom = 0;
  int        ito = 0;
  size_t     sizeFrom, sizeTo;
  SDFileSet *pSetFrom;
  SDFileSet *pSetTo;
H
Hongze Cheng 已提交
433

H
Hongze Cheng 已提交
434 435
  sizeFrom = taosArrayGetSize(pFrom->df);
  sizeTo = taosArrayGetSize(pTo->df);
H
Hongze Cheng 已提交
436

H
Hongze Cheng 已提交
437
  // Apply meta file change
H
Hongze Cheng 已提交
438
  // (void)tsdbApplyMFileChange(pFrom->pmf, pTo->pmf);
H
Hongze Cheng 已提交
439

H
Hongze Cheng 已提交
440 441 442
  // Apply SDFileSet change
  if (ifrom >= sizeFrom) {
    pSetFrom = NULL;
H
Hongze Cheng 已提交
443
  } else {
H
Hongze Cheng 已提交
444
    pSetFrom = taosArrayGet(pFrom->df, ifrom);
H
Hongze Cheng 已提交
445 446
  }

H
Hongze Cheng 已提交
447 448 449 450 451
  if (ito >= sizeTo) {
    pSetTo = NULL;
  } else {
    pSetTo = taosArrayGet(pTo->df, ito);
  }
H
Hongze Cheng 已提交
452

H
Hongze Cheng 已提交
453 454
  while (true) {
    if ((pSetTo == NULL) && (pSetFrom == NULL)) break;
H
Hongze Cheng 已提交
455

H
Hongze Cheng 已提交
456 457
    if (pSetTo == NULL || (pSetFrom && pSetFrom->fid < pSetTo->fid)) {
      tsdbApplyDFileSetChange(pSetFrom, NULL);
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459 460 461 462 463 464 465 466
      ifrom++;
      if (ifrom >= sizeFrom) {
        pSetFrom = NULL;
      } else {
        pSetFrom = taosArrayGet(pFrom->df, ifrom);
      }
    } else if (pSetFrom == NULL || pSetFrom->fid > pSetTo->fid) {
      // Do nothing
H
Hongze Cheng 已提交
467 468 469 470 471
      ito++;
      if (ito >= sizeTo) {
        pSetTo = NULL;
      } else {
        pSetTo = taosArrayGet(pTo->df, ito);
H
Hongze Cheng 已提交
472 473 474
      }
    } else {
      tsdbApplyDFileSetChange(pSetFrom, pSetTo);
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476 477 478 479 480 481
      ifrom++;
      if (ifrom >= sizeFrom) {
        pSetFrom = NULL;
      } else {
        pSetFrom = taosArrayGet(pFrom->df, ifrom);
      }
H
Hongze Cheng 已提交
482

H
Hongze Cheng 已提交
483 484 485 486 487 488 489 490
      ito++;
      if (ito >= sizeTo) {
        pSetTo = NULL;
      } else {
        pSetTo = taosArrayGet(pTo->df, ito);
      }
    }
  }
H
Hongze Cheng 已提交
491 492
}

H
Hongze Cheng 已提交
493 494 495 496 497
// ================== SFSIter
// ASSUMPTIONS: the FS Should be read locked when calling these functions
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction) {
  pIter->pfs = pfs;
  pIter->direction = direction;
H
Hongze Cheng 已提交
498

H
Hongze Cheng 已提交
499
  size_t size = taosArrayGetSize(pfs->cstatus->df);
H
Hongze Cheng 已提交
500

H
Hongze Cheng 已提交
501
  pIter->version = pfs->cstatus->meta.version;
H
Hongze Cheng 已提交
502

H
Hongze Cheng 已提交
503 504 505 506 507 508 509
  if (size == 0) {
    pIter->index = -1;
    pIter->fid = TSDB_IVLD_FID;
  } else {
    if (direction == TSDB_FS_ITER_FORWARD) {
      pIter->index = 0;
    } else {
S
TD-1207  
Shengliang Guan 已提交
510
      pIter->index = (int)(size - 1);
H
Hongze Cheng 已提交
511
    }
H
Hongze Cheng 已提交
512

H
Hongze Cheng 已提交
513
    pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
H
Hongze Cheng 已提交
514 515 516
  }
}

H
Hongze Cheng 已提交
517 518 519
void tsdbFSIterSeek(SFSIter *pIter, int fid) {
  STsdbFS *pfs = pIter->pfs;
  size_t   size = taosArrayGetSize(pfs->cstatus->df);
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521 522 523 524 525
  int flags;
  if (pIter->direction == TSDB_FS_ITER_FORWARD) {
    flags = TD_GE;
  } else {
    flags = TD_LE;
H
Hongze Cheng 已提交
526 527
  }

H
Hongze Cheng 已提交
528
  void *ptr = taosbsearch(&fid, pfs->cstatus->df->pData, size, sizeof(SDFileSet), tsdbComparFidFSet, flags);
H
Hongze Cheng 已提交
529 530 531 532
  if (ptr == NULL) {
    pIter->index = -1;
    pIter->fid = TSDB_IVLD_FID;
  } else {
S
TD-1207  
Shengliang Guan 已提交
533
    pIter->index = (int)(TARRAY_ELEM_IDX(pfs->cstatus->df, ptr));
H
Hongze Cheng 已提交
534
    pIter->fid = ((SDFileSet *)ptr)->fid;
H
Hongze Cheng 已提交
535 536 537
  }
}

H
Hongze Cheng 已提交
538
SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
H
Hongze Cheng 已提交
539
  STsdbFS   *pfs = pIter->pfs;
H
Hongze Cheng 已提交
540
  SDFileSet *pSet;
H
Hongze Cheng 已提交
541

H
Hongze Cheng 已提交
542 543
  if (pIter->index < 0) {
    ASSERT(pIter->fid == TSDB_IVLD_FID);
H
Hongze Cheng 已提交
544 545 546
    return NULL;
  }

H
Hongze Cheng 已提交
547
  ASSERT(pIter->fid != TSDB_IVLD_FID);
H
Hongze Cheng 已提交
548

H
Hongze Cheng 已提交
549
  if (pIter->version != pfs->cstatus->meta.version) {
550
    pIter->version = pfs->cstatus->meta.version;
H
Hongze Cheng 已提交
551
    tsdbFSIterSeek(pIter, pIter->fid);
H
Hongze Cheng 已提交
552 553
  }

H
Hongze Cheng 已提交
554 555
  if (pIter->index < 0) {
    return NULL;
H
Hongze Cheng 已提交
556 557
  }

H
Hongze Cheng 已提交
558 559
  pSet = (SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index);
  ASSERT(pSet->fid == pIter->fid);
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561 562 563 564
  if (pIter->direction == TSDB_FS_ITER_FORWARD) {
    pIter->index++;
    if (pIter->index >= taosArrayGetSize(pfs->cstatus->df)) {
      pIter->index = -1;
H
Hongze Cheng 已提交
565 566
    }
  } else {
H
Hongze Cheng 已提交
567
    pIter->index--;
H
Hongze Cheng 已提交
568 569
  }

H
Hongze Cheng 已提交
570
  if (pIter->index >= 0) {
H
Hongze Cheng 已提交
571
    pIter->fid = ((SDFileSet *)taosArrayGet(pfs->cstatus->df, pIter->index))->fid;
H
Hongze Cheng 已提交
572
  } else {
H
Hongze Cheng 已提交
573
    pIter->fid = TSDB_IVLD_FID;
H
Hongze Cheng 已提交
574 575
  }

H
Hongze Cheng 已提交
576
  return pSet;
H
Hongze Cheng 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589
}

static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
  int        fid = *(int *)arg1;
  SDFileSet *pSet = (SDFileSet *)arg2;

  if (fid < pSet->fid) {
    return -1;
  } else if (fid == pSet->fid) {
    return 0;
  } else {
    return 1;
  }
H
Hongze Cheng 已提交
590 591
}

S
Shengliang Guan 已提交
592
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
C
Cary Xu 已提交
593
  snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/%s/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), REPO_ID(pRepo),
594
           pRepo->dir, tsdbTxnFname[ftype]);
H
Hongze Cheng 已提交
595 596
}

H
Hongze Cheng 已提交
597
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
H
Hongze Cheng 已提交
598
  STsdbFS  *pfs = REPO_FS(pRepo);
599
  TdFilePtr pFile = NULL;
H
Hongze Cheng 已提交
600
  void     *buffer = NULL;
H
Hongze Cheng 已提交
601 602
  SFSHeader fsheader;
  char      current[TSDB_FILENAME_LEN] = "\0";
H
Hongze Cheng 已提交
603
  void     *ptr;
H
Hongze Cheng 已提交
604

S
Shengliang Guan 已提交
605
  tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
H
Hongze Cheng 已提交
606 607

  // current file exists, try to recover
608 609
  pFile = taosOpenFile(current, TD_FILE_READ);
  if (pFile == NULL) {
H
Hongze Cheng 已提交
610 611 612 613 614 615 616 617 618
    tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), current, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (tsdbMakeRoom(&buffer, TSDB_FILE_HEAD_SIZE) < 0) {
    goto _err;
  }

619
  int nread = (int)taosReadFile(pFile, buffer, TSDB_FILE_HEAD_SIZE);
H
Hongze Cheng 已提交
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
  if (nread < 0) {
    tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILENAME_LEN, current,
              strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    goto _err;
  }

  if (nread < TSDB_FILE_HEAD_SIZE) {
    tsdbError("vgId:%d failed to read header of file %s, read bytes:%d", REPO_ID(pRepo), current, nread);
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    goto _err;
  }

  if (!taosCheckChecksumWhole((uint8_t *)buffer, TSDB_FILE_HEAD_SIZE)) {
    tsdbError("vgId:%d header of file %s failed checksum check", REPO_ID(pRepo), current);
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    goto _err;
  }

H
Hongze Cheng 已提交
639
  SFSStatus *pStatus = pfs->cstatus;
H
Hongze Cheng 已提交
640 641
  ptr = buffer;
  ptr = tsdbDecodeFSHeader(ptr, &fsheader);
H
Hongze Cheng 已提交
642
  ptr = tsdbDecodeFSMeta(ptr, &(pStatus->meta));
H
Hongze Cheng 已提交
643

644
  if (fsheader.version != TSDB_LATEST_SFS_VER) {
H
Hongze Cheng 已提交
645 646 647 648 649 650 651 652
    // TODO: handle file version change
  }

  if (fsheader.len > 0) {
    if (tsdbMakeRoom(&buffer, fsheader.len) < 0) {
      goto _err;
    }

653
    nread = (int)taosReadFile(pFile, buffer, fsheader.len);
H
Hongze Cheng 已提交
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672
    if (nread < 0) {
      tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), current, strerror(errno));
      terrno = TAOS_SYSTEM_ERROR(errno);
      goto _err;
    }

    if (nread < fsheader.len) {
      tsdbError("vgId:%d failed to read %d bytes from file %s", REPO_ID(pRepo), fsheader.len, current);
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
      goto _err;
    }

    if (!taosCheckChecksumWhole((uint8_t *)buffer, fsheader.len)) {
      tsdbError("vgId:%d file %s is corrupted since wrong checksum", REPO_ID(pRepo), current);
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
      goto _err;
    }

    ptr = buffer;
S
Shengliang Guan 已提交
673
    ptr = tsdbDecodeFSStatus(pRepo, ptr, pStatus);
H
Hongze Cheng 已提交
674 675 676 677 678
  } else {
    tsdbResetFSStatus(pStatus);
  }

  taosTZfree(buffer);
679
  taosCloseFile(&pFile);
H
Hongze Cheng 已提交
680 681 682 683

  return 0;

_err:
684 685
  if (pFile != NULL) {
    taosCloseFile(&pFile);
H
Hongze Cheng 已提交
686 687 688 689 690 691
  }
  taosTZfree(buffer);
  return -1;
}

// Scan and try to fix incorrect files
H
Hongze Cheng 已提交
692
static int tsdbScanAndTryFixFS(STsdb *pRepo) {
H
Hongze Cheng 已提交
693
  STsdbFS   *pfs = REPO_FS(pRepo);
H
Hongze Cheng 已提交
694 695
  SFSStatus *pStatus = pfs->cstatus;

H
Hongze Cheng 已提交
696 697 698 699
  // if (tsdbScanAndTryFixMFile(pRepo) < 0) {
  //   tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
  //   return -1;
  // }
H
Hongze Cheng 已提交
700 701 702 703 704 705

  size_t size = taosArrayGetSize(pStatus->df);

  for (size_t i = 0; i < size; i++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i);

H
Hongze Cheng 已提交
706
    if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) {
H
Hongze Cheng 已提交
707 708 709 710 711
      tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno));
      return -1;
    }
  }

H
Hongze Cheng 已提交
712
  // remove those unused files
713 714
  tsdbScanRootDir(pRepo);
  tsdbScanDataDir(pRepo);
H
Hongze Cheng 已提交
715 716 717
  return 0;
}

H
Hongze Cheng 已提交
718
static int tsdbScanRootDir(STsdb *pRepo) {
H
Hongze Cheng 已提交
719 720 721
  char            rootDir[TSDB_FILENAME_LEN];
  char            bname[TSDB_FILENAME_LEN];
  STsdbFS        *pfs = REPO_FS(pRepo);
S
Shengliang Guan 已提交
722
  const STfsFile *pf;
723

724
  tsdbGetRootDir(REPO_ID(pRepo), pRepo->dir, rootDir);
H
Hongze Cheng 已提交
725
  STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
726 727 728 729 730 731
  if (tdir == NULL) {
    tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
    return -1;
  }

  while ((pf = tfsReaddir(tdir))) {
S
Shengliang Guan 已提交
732
    tfsBasename(pf, bname);
733 734 735 736 737 738

    if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
      // Skip current file and data directory
      continue;
    }

H
Hongze Cheng 已提交
739 740 741
    // if (/*pfs->cstatus->pmf && */ tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
    //   continue;
    // }
742

S
Shengliang Guan 已提交
743 744
    (void)tfsRemoveFile(pf);
    tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
745 746 747 748 749 750 751
  }

  tfsClosedir(tdir);

  return 0;
}

H
Hongze Cheng 已提交
752
static int tsdbScanDataDir(STsdb *pRepo) {
H
Hongze Cheng 已提交
753 754 755
  char            dataDir[TSDB_FILENAME_LEN];
  char            bname[TSDB_FILENAME_LEN];
  STsdbFS        *pfs = REPO_FS(pRepo);
S
Shengliang Guan 已提交
756
  const STfsFile *pf;
757

758
  tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
H
Hongze Cheng 已提交
759
  STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
760 761 762 763 764 765
  if (tdir == NULL) {
    tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
    return -1;
  }

  while ((pf = tfsReaddir(tdir))) {
S
Shengliang Guan 已提交
766
    tfsBasename(pf, bname);
767 768

    if (!tsdbIsTFileInFS(pfs, pf)) {
S
Shengliang Guan 已提交
769 770
      (void)tfsRemoveFile(pf);
      tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
771 772 773 774 775 776 777 778
    }
  }

  tfsClosedir(tdir);

  return 0;
}

S
Shengliang Guan 已提交
779
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
780 781 782 783 784 785 786 787 788 789 790 791 792 793
  SFSIter fsiter;
  tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
  SDFileSet *pSet;

  while ((pSet = tsdbFSIterNext(&fsiter))) {
    for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
      SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
      if (tfsIsSameFile(pf, TSDB_FILE_F(pDFile))) {
        return true;
      }
    }
  }

  return false;
H
Hongze Cheng 已提交
794 795
}

H
Hongze Cheng 已提交
796
static int tsdbRestoreDFileSet(STsdb *pRepo) {
H
Hongze Cheng 已提交
797 798 799
  char            dataDir[TSDB_FILENAME_LEN];
  char            bname[TSDB_FILENAME_LEN];
  STfsDir        *tdir = NULL;
S
Shengliang Guan 已提交
800
  const STfsFile *pf = NULL;
H
Hongze Cheng 已提交
801 802 803 804
  const char     *pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$";
  SArray         *fArray = NULL;
  regex_t         regex;
  STsdbFS        *pfs = REPO_FS(pRepo);
H
Hongze Cheng 已提交
805

806
  tsdbGetDataDir(REPO_ID(pRepo), pRepo->dir, dataDir);
H
Hongze Cheng 已提交
807 808 809 810

  // Resource allocation and init
  regcomp(&regex, pattern, REG_EXTENDED);

S
Shengliang Guan 已提交
811
  fArray = taosArrayInit(1024, sizeof(STfsFile));
H
Hongze Cheng 已提交
812 813 814 815 816 817 818 819
  if (fArray == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
              tstrerror(terrno));
    regfree(&regex);
    return -1;
  }

H
Hongze Cheng 已提交
820
  tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
H
Hongze Cheng 已提交
821
  if (tdir == NULL) {
H
Hongze Cheng 已提交
822 823 824 825
    tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
              tstrerror(terrno));
    taosArrayDestroy(fArray);
    regfree(&regex);
H
Hongze Cheng 已提交
826 827 828
    return -1;
  }

H
Hongze Cheng 已提交
829
  while ((pf = tfsReaddir(tdir))) {
S
Shengliang Guan 已提交
830
    tfsBasename(pf, bname);
H
Hongze Cheng 已提交
831 832 833

    int code = regexec(&regex, bname, 0, NULL, 0);
    if (code == 0) {
834
      if (taosArrayPush(fArray, (void *)pf) == NULL) {
H
Hongze Cheng 已提交
835 836 837 838 839 840 841 842
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        tfsClosedir(tdir);
        taosArrayDestroy(fArray);
        regfree(&regex);
        return -1;
      }
    } else if (code == REG_NOMATCH) {
      // Not match
S
Shengliang Guan 已提交
843 844
      tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
      (void)tfsRemoveFile(pf);
H
Hongze Cheng 已提交
845 846 847 848 849 850 851 852 853 854 855
      continue;
    } else {
      // Has other error
      tsdbError("vgId:%d failed to restore DFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code));
      terrno = TAOS_SYSTEM_ERROR(code);
      tfsClosedir(tdir);
      taosArrayDestroy(fArray);
      regfree(&regex);
      return -1;
    }
  }
H
Hongze Cheng 已提交
856 857

  tfsClosedir(tdir);
H
Hongze Cheng 已提交
858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
  regfree(&regex);

  // Sort the array according to file name
  taosArraySort(fArray, tsdbComparTFILE);

  size_t index = 0;
  // Loop to recover each file set
  for (;;) {
    if (index >= taosArrayGetSize(fArray)) {
      break;
    }

    SDFileSet fset = {0};

    TSDB_FSET_SET_CLOSED(&fset);

    // Loop to recover ONE fset
    for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
      SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);

      if (index >= taosArrayGetSize(fArray)) {
        tsdbError("vgId:%d incomplete DFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
        taosArrayDestroy(fArray);
        return -1;
      }

      pf = taosArrayGet(fArray, index);

      int         tvid, tfid;
      TSDB_FILE_T ttype;
      uint32_t    tversion;
889
      char        _bname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
890

S
Shengliang Guan 已提交
891
      tfsBasename(pf, _bname);
892
      tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion);
H
Hongze Cheng 已提交
893 894 895

      ASSERT(tvid == REPO_ID(pRepo));

896 897 898 899 900
      if (tfid < pRepo->rtn.minFid) {  // skip file expired
        ++index;
        continue;
      }

H
Hongze Cheng 已提交
901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
      if (ftype == 0) {
        fset.fid = tfid;
      } else {
        if (tfid != fset.fid) {
          tsdbError("vgId:%d incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
          taosArrayDestroy(fArray);
          return -1;
        }
      }

      if (ttype != ftype) {
        tsdbError("vgId:%d incomplete dFileSet, fid:%d", REPO_ID(pRepo), fset.fid);
        taosArrayDestroy(fArray);
        return -1;
      }

      pDFile->f = *pf;
H
Hongze Cheng 已提交
918

919 920
      // if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) {
      if (tsdbOpenDFile(pDFile, TD_FILE_READ) < 0) {
H
Hongze Cheng 已提交
921 922
        tsdbError("vgId:%d failed to open DFile %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
                  tstrerror(terrno));
H
Hongze Cheng 已提交
923 924 925 926 927 928 929 930 931 932 933
        taosArrayDestroy(fArray);
        return -1;
      }

      if (tsdbLoadDFileHeader(pDFile, &(pDFile->info)) < 0) {
        tsdbError("vgId:%d failed to load DFile %s header since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
                  tstrerror(terrno));
        taosArrayDestroy(fArray);
        return -1;
      }

934
      if (tsdbForceKeepFile) {
935
        int64_t file_size;
936
        // Get real file size
937
        if (taosFStatFile(pDFile->pFile, &file_size, NULL) < 0) {
938 939 940 941 942
          terrno = TAOS_SYSTEM_ERROR(errno);
          taosArrayDestroy(fArray);
          return -1;
        }

943
        if (pDFile->info.size != file_size) {
944
          int64_t tfsize = pDFile->info.size;
945
          pDFile->info.size = file_size;
946 947 948 949 950
          tsdbInfo("vgId:%d file %s header size is changed from %" PRId64 " to %" PRId64, REPO_ID(pRepo),
                   TSDB_FILE_FULL_NAME(pDFile), tfsize, pDFile->info.size);
        }
      }

H
Hongze Cheng 已提交
951
      tsdbCloseDFile(pDFile);
H
Hongze Cheng 已提交
952
      index++;
H
Hongze Cheng 已提交
953 954
    }

H
Hongze Cheng 已提交
955
    tsdbInfo("vgId:%d FSET %d is restored", REPO_ID(pRepo), fset.fid);
H
Hongze Cheng 已提交
956 957 958 959 960
    taosArrayPush(pfs->cstatus->df, &fset);
  }

  // Resource release
  taosArrayDestroy(fArray);
H
Hongze Cheng 已提交
961 962

  return 0;
H
Hongze Cheng 已提交
963 964
}

H
Hongze Cheng 已提交
965
static int tsdbRestoreCurrent(STsdb *pRepo) {
H
Hongze Cheng 已提交
966 967 968 969 970
  // // Loop to recover mfile
  // if (tsdbRestoreMeta(pRepo) < 0) {
  //   tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
  //   return -1;
  // }
H
Hongze Cheng 已提交
971 972 973 974 975 976 977

  // Loop to recover dfile set
  if (tsdbRestoreDFileSet(pRepo) < 0) {
    tsdbError("vgId:%d failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
    return -1;
  }

S
Shengliang Guan 已提交
978
  if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
S
Shengliang Guan 已提交
979
    tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
H
Hongze Cheng 已提交
980 981 982 983 984 985 986
    return -1;
  }

  return 0;
}

static int tsdbComparTFILE(const void *arg1, const void *arg2) {
S
Shengliang Guan 已提交
987 988
  STfsFile *pf1 = (STfsFile *)arg1;
  STfsFile *pf2 = (STfsFile *)arg2;
H
Hongze Cheng 已提交
989 990 991 992

  int         vid1, fid1, vid2, fid2;
  TSDB_FILE_T ftype1, ftype2;
  uint32_t    version1, version2;
H
Hongze Cheng 已提交
993 994
  char        bname1[TSDB_FILENAME_LEN];
  char        bname2[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
995

S
Shengliang Guan 已提交
996 997
  tfsBasename(pf1, bname1);
  tfsBasename(pf2, bname2);
H
Hongze Cheng 已提交
998 999
  tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1);
  tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2);
H
Hongze Cheng 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013

  if (fid1 < fid2) {
    return -1;
  } else if (fid1 > fid2) {
    return 1;
  } else {
    if (ftype1 < ftype2) {
      return -1;
    } else if (ftype1 > ftype2) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
1014 1015
}

H
Hongze Cheng 已提交
1016
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
H
Hongze Cheng 已提交
1017
  STsdbFS   *pfs = REPO_FS(pRepo);
H
Hongze Cheng 已提交
1018 1019 1020 1021 1022 1023
  SFSStatus *pStatus = pfs->cstatus;
  SDFInfo    info;

  for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) {
    SDFileSet fset;
    tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i));
1024 1025 1026
    if (fset.fid < pRepo->rtn.minFid) {
      ++*nExpired;
    }
H
Hongze Cheng 已提交
1027 1028
    tsdbDebug("vgId:%d scan DFileSet %d header", REPO_ID(pRepo), fset.fid);

1029 1030
    // if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) {
    if (tsdbOpenDFileSet(&fset, TD_FILE_WRITE | TD_FILE_READ) < 0) {
H
Hongze Cheng 已提交
1031 1032 1033 1034 1035 1036 1037
      tsdbError("vgId:%d failed to open DFileSet %d since %s, continue", REPO_ID(pRepo), fset.fid, tstrerror(terrno));
      continue;
    }

    for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
      SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype);

H
Hongze Cheng 已提交
1038 1039
      if ((tsdbLoadDFileHeader(pDFile, &info) < 0) || pDFile->info.size != info.size ||
          pDFile->info.magic != info.magic) {
H
Hongze Cheng 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
        if (tsdbUpdateDFileHeader(pDFile) < 0) {
          tsdbError("vgId:%d failed to update DFile header of %s since %s, continue", REPO_ID(pRepo),
                    TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno));
        } else {
          tsdbInfo("vgId:%d DFile header of %s is updated", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
          TSDB_FILE_FSYNC(pDFile);
        }
      } else {
        tsdbDebug("vgId:%d DFile header of %s is correct", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
      }
    }

    tsdbCloseDFileSet(&fset);
  }
L
Liu Jicong 已提交
1054
}