tsdbFS2.c 18.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdbFS2.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
H
Hongze Cheng 已提交
19
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
H
Hongze Cheng 已提交
20

H
Hongze Cheng 已提交
21 22
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
H
Hongze Cheng 已提交
23 24 25 26 27 28 29 30

enum {
  TSDB_FS_STATE_NONE = 0,
  TSDB_FS_STATE_OPEN,
  TSDB_FS_STATE_EDIT,
  TSDB_FS_STATE_CLOSE,
};

H
Hongze Cheng 已提交
31 32
typedef enum {
  TSDB_FCURRENT = 1,
H
Hongze Cheng 已提交
33 34
  TSDB_FCURRENT_C,  // for commit
  TSDB_FCURRENT_M,  // for merge
H
Hongze Cheng 已提交
35 36 37 38
} EFCurrentT;

static const char *gCurrentFname[] = {
    [TSDB_FCURRENT] = "current.json",
H
Hongze Cheng 已提交
39 40
    [TSDB_FCURRENT_C] = "current.c.json",
    [TSDB_FCURRENT_M] = "current.m.json",
H
Hongze Cheng 已提交
41 42
};

H
Hongze Cheng 已提交
43 44 45 46
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
  if (fs[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
47
  fs[0]->tsdb = pTsdb;
H
Hongze Cheng 已提交
48
  tsem_init(&fs[0]->canEdit, 0, 1);
H
Hongze Cheng 已提交
49
  fs[0]->state = TSDB_FS_STATE_NONE;
H
Hongze Cheng 已提交
50
  fs[0]->neid = 0;
H
Hongze Cheng 已提交
51
  fs[0]->mergeTaskOn = false;
H
Hongze Cheng 已提交
52 53
  TARRAY2_INIT(fs[0]->fSetArr);
  TARRAY2_INIT(fs[0]->fSetArrTmp);
H
Hongze Cheng 已提交
54

H
Hongze Cheng 已提交
55 56 57
  return 0;
}

H
Hongze Cheng 已提交
58 59
static int32_t destroy_fs(STFileSystem **fs) {
  if (fs[0] == NULL) return 0;
H
Hongze Cheng 已提交
60 61
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
H
Hongze Cheng 已提交
62 63 64
  tsem_destroy(&fs[0]->canEdit);
  taosMemoryFree(fs[0]);
  fs[0] = NULL;
H
Hongze Cheng 已提交
65 66 67
  return 0;
}

H
Hongze Cheng 已提交
68
static int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
H
Hongze Cheng 已提交
69 70 71 72 73 74 75 76
  if (pTsdb->pVnode->pTfs) {
    snprintf(fname,                                   //
             TSDB_FILENAME_LEN,                       //
             "%s%s%s%s%s",                            //
             tfsGetPrimaryPath(pTsdb->pVnode->pTfs),  //
             TD_DIRSEP,                               //
             pTsdb->path,                             //
             TD_DIRSEP,                               //
H
Hongze Cheng 已提交
77
             gCurrentFname[ftype]);
H
Hongze Cheng 已提交
78 79 80 81 82 83
  } else {
    snprintf(fname,              //
             TSDB_FILENAME_LEN,  //
             "%s%s%s",           //
             pTsdb->path,        //
             TD_DIRSEP,          //
H
Hongze Cheng 已提交
84
             gCurrentFname[ftype]);
H
Hongze Cheng 已提交
85 86 87 88
  }
  return 0;
}

H
Hongze Cheng 已提交
89
static int32_t save_json(const cJSON *json, const char *fname) {
H
Hongze Cheng 已提交
90
  int32_t code = 0;
H
Hongze Cheng 已提交
91

H
Hongze Cheng 已提交
92
  char *data = cJSON_PrintUnformatted(json);
H
Hongze Cheng 已提交
93 94 95 96 97 98
  if (data == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  TdFilePtr fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (fp == NULL) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
H
Hongze Cheng 已提交
99 100
  }

H
Hongze Cheng 已提交
101
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
H
Hongze Cheng 已提交
102 103
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
H
Hongze Cheng 已提交
104 105
  }

H
Hongze Cheng 已提交
106 107 108 109
  if (taosFsyncFile(fp) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }
H
Hongze Cheng 已提交
110

H
Hongze Cheng 已提交
111
  taosCloseFile(&fp);
H
Hongze Cheng 已提交
112 113

_exit:
H
Hongze Cheng 已提交
114
  taosMemoryFree(data);
H
Hongze Cheng 已提交
115 116 117
  return code;
}

H
Hongze Cheng 已提交
118 119
static int32_t load_json(const char *fname, cJSON **json) {
  int32_t code = 0;
H
Hongze Cheng 已提交
120
  char   *data = NULL;
H
Hongze Cheng 已提交
121 122 123 124 125 126 127 128 129 130

  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
  if (fp == NULL) return TAOS_SYSTEM_ERROR(code);

  int64_t size;
  if (taosFStatFile(fp, &size, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Hongze Cheng 已提交
131
  data = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
132 133 134 135 136 137 138 139 140
  if (data == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  if (taosReadFile(fp, data, size) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }
H
Hongze Cheng 已提交
141
  data[size] = '\0';
H
Hongze Cheng 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155

  json[0] = cJSON_Parse(data);
  if (json[0] == NULL) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _exit;
  }

_exit:
  taosCloseFile(&fp);
  if (data) taosMemoryFree(data);
  if (code) json[0] = NULL;
  return code;
}

H
Hongze Cheng 已提交
156
static int32_t save_fs(const TFileSetArray *arr, const char *fname) {
H
Hongze Cheng 已提交
157
  int32_t code = 0;
H
Hongze Cheng 已提交
158
  int32_t lino = 0;
H
Hongze Cheng 已提交
159

H
Hongze Cheng 已提交
160
  cJSON *json = cJSON_CreateObject();
H
Hongze Cheng 已提交
161
  if (!json) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
162

H
Hongze Cheng 已提交
163 164 165
  // fmtv
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
166
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
167 168
  }

H
Hongze Cheng 已提交
169 170
  // fset
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
H
Hongze Cheng 已提交
171
  if (!ajson) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
H
Hongze Cheng 已提交
172 173 174
  const STFileSet *fset;
  TARRAY2_FOREACH(arr, fset) {
    cJSON *item = cJSON_CreateObject();
H
Hongze Cheng 已提交
175 176
    if (!item) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
    cJSON_AddItemToArray(ajson, item);
H
Hongze Cheng 已提交
177

H
Hongze Cheng 已提交
178
    code = tsdbTFileSetToJson(fset, item);
H
Hongze Cheng 已提交
179 180 181
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
182
  code = save_json(json, fname);
H
Hongze Cheng 已提交
183
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
184 185 186

_exit:
  if (code) {
H
Hongze Cheng 已提交
187
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
188
  }
H
Hongze Cheng 已提交
189
  cJSON_Delete(json);
H
Hongze Cheng 已提交
190
  return code;
H
Hongze Cheng 已提交
191 192
}

H
Hongze Cheng 已提交
193
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
H
Hongze Cheng 已提交
194 195 196
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
197
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
H
Hongze Cheng 已提交
198 199 200 201

  // load json
  cJSON *json = NULL;
  code = load_json(fname, &json);
H
Hongze Cheng 已提交
202
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
203 204

  // parse json
H
Hongze Cheng 已提交
205
  const cJSON *item1;
H
Hongze Cheng 已提交
206 207

  /* fmtv */
H
Hongze Cheng 已提交
208 209 210
  item1 = cJSON_GetObjectItem(json, "fmtv");
  if (cJSON_IsNumber(item1)) {
    ASSERT(item1->valuedouble == 1);
H
Hongze Cheng 已提交
211
  } else {
H
Hongze Cheng 已提交
212
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
H
Hongze Cheng 已提交
213 214 215
  }

  /* fset */
H
Hongze Cheng 已提交
216 217 218 219
  item1 = cJSON_GetObjectItem(json, "fset");
  if (cJSON_IsArray(item1)) {
    const cJSON *item2;
    cJSON_ArrayForEach(item2, item1) {
H
Hongze Cheng 已提交
220
      STFileSet *fset;
H
Hongze Cheng 已提交
221
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
H
Hongze Cheng 已提交
222
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
223

H
Hongze Cheng 已提交
224
      code = TARRAY2_APPEND(arr, fset);
H
Hongze Cheng 已提交
225
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
226 227
    }
  } else {
H
Hongze Cheng 已提交
228
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
H
Hongze Cheng 已提交
229 230 231 232 233 234 235 236
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
  }
  if (json) cJSON_Delete(json);
  return code;
H
Hongze Cheng 已提交
237 238
}

H
Hongze Cheng 已提交
239 240 241 242 243 244 245 246
static bool is_same_file(const STFile *f1, const STFile f2) {
  if (f1->type != f2.type) return false;
  if (f1->did.level != f2.did.level) return false;
  if (f1->did.id != f2.did.id) return false;
  if (f1->cid != f2.cid) return false;
  return true;
}

H
Hongze Cheng 已提交
247
static int32_t apply_commit(STFileSystem *fs) {
H
Hongze Cheng 已提交
248
  int32_t        code = 0;
H
Hongze Cheng 已提交
249 250
  TFileSetArray *fsetArray1 = fs->fSetArr;
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
H
Hongze Cheng 已提交
251
  int32_t        i1 = 0, i2 = 0;
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
H
Hongze Cheng 已提交
254 255
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
H
Hongze Cheng 已提交
256 257 258

    if (fset1 && fset2) {
      if (fset1->fid < fset2->fid) {
H
Hongze Cheng 已提交
259 260
        // delete fset1
        TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
H
Hongze Cheng 已提交
261 262
      } else if (fset1->fid > fset2->fid) {
        // create new file set with fid of fset2->fid
H
Hongze Cheng 已提交
263
        code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1);
H
Hongze Cheng 已提交
264
        if (code) return code;
H
Hongze Cheng 已提交
265
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
266 267
        if (code) return code;
        i1++;
H
Hongze Cheng 已提交
268
        i2++;
H
Hongze Cheng 已提交
269 270
      } else {
        // edit
H
Hongze Cheng 已提交
271
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
H
Hongze Cheng 已提交
272 273 274 275 276
        if (code) return code;
        i1++;
        i2++;
      }
    } else if (fset1) {
H
Hongze Cheng 已提交
277 278
      // delete fset1
      TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
H
Hongze Cheng 已提交
279 280
    } else {
      // create new file set with fid of fset2->fid
H
Hongze Cheng 已提交
281
      code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1);
H
Hongze Cheng 已提交
282
      if (code) return code;
H
Hongze Cheng 已提交
283
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
284 285
      if (code) return code;
      i1++;
H
Hongze Cheng 已提交
286
      i2++;
H
Hongze Cheng 已提交
287 288
    }
  }
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291 292 293
  return 0;
}

static int32_t commit_edit(STFileSystem *fs) {
H
Hongze Cheng 已提交
294 295
  char current[TSDB_FILENAME_LEN];
  char current_t[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
296

H
Hongze Cheng 已提交
297
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
H
Hongze Cheng 已提交
298
  if (fs->etype == TSDB_FEDIT_COMMIT) {
H
Hongze Cheng 已提交
299
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
300
  } else if (fs->etype == TSDB_FEDIT_MERGE) {
H
Hongze Cheng 已提交
301
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
302 303
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
304 305
  }

H
Hongze Cheng 已提交
306 307 308
  int32_t code;
  int32_t lino;
  if ((code = taosRenameFile(current_t, current))) {
H
Hongze Cheng 已提交
309
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
H
Hongze Cheng 已提交
310
  }
H
Hongze Cheng 已提交
311

H
Hongze Cheng 已提交
312
  code = apply_commit(fs);
H
Hongze Cheng 已提交
313
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
314

H
Hongze Cheng 已提交
315 316
_exit:
  if (code) {
H
Hongze Cheng 已提交
317
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
318
  } else {
H
Hongze Cheng 已提交
319
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
H
Hongze Cheng 已提交
320 321
  }
  return code;
H
Hongze Cheng 已提交
322 323
}

H
Hongze Cheng 已提交
324 325 326 327 328 329 330
// static int32_t
static int32_t apply_abort(STFileSystem *fs) {
  // TODO
  return 0;
}

static int32_t abort_edit(STFileSystem *fs) {
H
Hongze Cheng 已提交
331
  char fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
332

H
Hongze Cheng 已提交
333
  if (fs->etype == TSDB_FEDIT_COMMIT) {
H
Hongze Cheng 已提交
334
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
335
  } else if (fs->etype == TSDB_FEDIT_MERGE) {
H
Hongze Cheng 已提交
336
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
337 338 339
  } else {
    ASSERT(0);
  }
H
Hongze Cheng 已提交
340

H
Hongze Cheng 已提交
341 342 343
  int32_t code;
  int32_t lino;
  if ((code = taosRemoveFile(fname))) {
H
Hongze Cheng 已提交
344
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
H
Hongze Cheng 已提交
345
  }
H
Hongze Cheng 已提交
346

H
Hongze Cheng 已提交
347
  code = apply_abort(fs);
H
Hongze Cheng 已提交
348
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
349 350 351

_exit:
  if (code) {
H
Hongze Cheng 已提交
352
    tsdbError("vgId:%d %s failed since %s", TD_VID(fs->tsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
353
  } else {
H
Hongze Cheng 已提交
354
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
H
Hongze Cheng 已提交
355 356
  }
  return code;
H
Hongze Cheng 已提交
357 358
}

H
Hongze Cheng 已提交
359 360 361 362 363
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
  fs->neid = 0;

  // get max commit id
  const STFileSet *fset;
H
Hongze Cheng 已提交
364
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
H
Hongze Cheng 已提交
365

H
Hongze Cheng 已提交
366
  // TODO
H
Hongze Cheng 已提交
367 368 369
  return 0;
}

H
Hongze Cheng 已提交
370 371 372 373 374
static int32_t update_fs_if_needed(STFileSystem *pFS) {
  // TODO
  return 0;
}

H
Hongze Cheng 已提交
375 376 377
static int32_t tsdbFSDupState(STFileSystem *fs) {
  int32_t code;

H
Hongze Cheng 已提交
378 379
  const TFileSetArray *src = fs->fSetArr;
  TFileSetArray       *dst = fs->fSetArrTmp;
H
Hongze Cheng 已提交
380

H
Hongze Cheng 已提交
381 382 383 384
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);

  const STFileSet *fset1;
  TARRAY2_FOREACH(src, fset1) {
H
Hongze Cheng 已提交
385
    STFileSet *fset2;
H
Hongze Cheng 已提交
386
    code = tsdbTFileSetInitEx(fs->tsdb, fset1, &fset2);
H
Hongze Cheng 已提交
387
    if (code) return code;
H
Hongze Cheng 已提交
388
    code = TARRAY2_APPEND(dst, fset2);
H
Hongze Cheng 已提交
389 390 391 392 393 394
    if (code) return code;
  }

  return 0;
}

H
Hongze Cheng 已提交
395
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
H
Hongze Cheng 已提交
396
  int32_t code = 0;
H
Hongze Cheng 已提交
397
  int32_t lino = 0;
H
Hongze Cheng 已提交
398
  STsdb  *pTsdb = fs->tsdb;
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400
  code = update_fs_if_needed(fs);
H
Hongze Cheng 已提交
401
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404 405
  char fCurrent[TSDB_FILENAME_LEN];
  char cCurrent[TSDB_FILENAME_LEN];
  char mCurrent[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407 408
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
409
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
H
Hongze Cheng 已提交
412
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
H
Hongze Cheng 已提交
413
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
414

H
Hongze Cheng 已提交
415
    if (taosCheckExistFile(cCurrent)) {
H
Hongze Cheng 已提交
416 417 418
      // current.c.json exists

      fs->etype = TSDB_FEDIT_COMMIT;
H
Hongze Cheng 已提交
419
      if (rollback) {
H
Hongze Cheng 已提交
420
        code = abort_edit(fs);
H
Hongze Cheng 已提交
421 422
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
H
Hongze Cheng 已提交
423
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
H
Hongze Cheng 已提交
424
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
425 426

        code = commit_edit(fs);
H
Hongze Cheng 已提交
427 428
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
429 430 431 432
    } else if (taosCheckExistFile(mCurrent)) {
      // current.m.json exists
      fs->etype = TSDB_FEDIT_MERGE;
      code = abort_edit(fs);
H
Hongze Cheng 已提交
433 434
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
    code = tsdbFSDupState(fs);
H
Hongze Cheng 已提交
437 438
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
439
    code = tsdbFSScanAndFix(fs);
H
Hongze Cheng 已提交
440
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
441
  } else {
H
Hongze Cheng 已提交
442
    code = save_fs(fs->fSetArr, fCurrent);
H
Hongze Cheng 已提交
443
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
444 445 446 447
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
448
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
449
  } else {
H
Hongze Cheng 已提交
450
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
451
  }
H
Hongze Cheng 已提交
452 453 454
  return 0;
}

H
Hongze Cheng 已提交
455
static int32_t close_file_system(STFileSystem *fs) {
H
Hongze Cheng 已提交
456 457
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
H
Hongze Cheng 已提交
458
  // TODO
H
Hongze Cheng 已提交
459 460
  return 0;
}
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
static int32_t apply_edit(STFileSystem *pFS) {
H
Hongze Cheng 已提交
463
  int32_t code = 0;
H
Hongze Cheng 已提交
464
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
465
  return code;
H
Hongze Cheng 已提交
466 467
}

H
Hongze Cheng 已提交
468
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
H
Hongze Cheng 已提交
469 470 471 472 473 474 475 476
  if (pSet1->fid < pSet2->fid) {
    return -1;
  } else if (pSet1->fid > pSet2->fid) {
    return 1;
  }
  return 0;
}

H
Hongze Cheng 已提交
477 478 479
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
  int32_t        code = 0;
  int32_t        lino = 0;
H
Hongze Cheng 已提交
480
  TFileSetArray *fsetArray = fs->fSetArrTmp;
H
Hongze Cheng 已提交
481 482 483

  STFileSet      *fset = NULL;
  const STFileOp *op;
H
Hongze Cheng 已提交
484
  TARRAY2_FOREACH_PTR(opArray, op) {
H
Hongze Cheng 已提交
485 486 487
    if (!fset || fset->fid != op->fid) {
      STFileSet tfset = {.fid = op->fid};
      fset = &tfset;
H
Hongze Cheng 已提交
488
      fset = TARRAY2_SEARCH_EX(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
489 490 491 492 493

      if (!fset) {
        code = tsdbTFileSetInit(op->fid, &fset);
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
494
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
495 496
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
497 498
    }

H
Hongze Cheng 已提交
499
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
H
Hongze Cheng 已提交
500
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
501
  }
H
Hongze Cheng 已提交
502

H
Hongze Cheng 已提交
503 504 505 506 507 508 509 510 511
  // remove empty file set
  int32_t i = 0;
  while (i < TARRAY2_SIZE(fsetArray)) {
    fset = TARRAY2_GET(fsetArray, i);
    if (tsdbTFileSetIsEmpty(fset)) {
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
    } else {
      i++;
    }
H
Hongze Cheng 已提交
512 513
  }

H
Hongze Cheng 已提交
514
_exit:
H
Hongze Cheng 已提交
515
  return code;
H
Hongze Cheng 已提交
516 517
}

H
Hongze Cheng 已提交
518
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
H
Hongze Cheng 已提交
519 520 521
  int32_t code;
  int32_t lino;

H
Hongze Cheng 已提交
522
  code = create_fs(pTsdb, fs);
H
Hongze Cheng 已提交
523 524
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
525
  code = open_fs(fs[0], rollback);
H
Hongze Cheng 已提交
526
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
527 528 529

_exit:
  if (code) {
H
Hongze Cheng 已提交
530
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
531
    destroy_fs(fs);
H
Hongze Cheng 已提交
532
  } else {
H
Hongze Cheng 已提交
533
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
534 535 536 537
  }
  return 0;
}

H
Hongze Cheng 已提交
538
int32_t tsdbCloseFS(STFileSystem **ppFS) {
H
Hongze Cheng 已提交
539 540
  if (ppFS[0] == NULL) return 0;
  close_file_system(ppFS[0]);
H
Hongze Cheng 已提交
541
  destroy_fs(ppFS);
H
Hongze Cheng 已提交
542 543
  return 0;
}
H
Hongze Cheng 已提交
544

H
Hongze Cheng 已提交
545 546 547 548 549
int64_t tsdbFSAllocEid(STFileSystem *fs) {
  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
  int64_t cid = ++fs->neid;
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);
  return cid;
H
Hongze Cheng 已提交
550 551
}

H
Hongze Cheng 已提交
552
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
H
Hongze Cheng 已提交
553
  int32_t code = 0;
H
Hongze Cheng 已提交
554
  int32_t lino;
H
Hongze Cheng 已提交
555
  char    current_t[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
556

H
Hongze Cheng 已提交
557 558
  switch (etype) {
    case TSDB_FEDIT_COMMIT:
H
Hongze Cheng 已提交
559
      current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
560 561
      break;
    case TSDB_FEDIT_MERGE:
H
Hongze Cheng 已提交
562
      current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
563 564 565
      break;
    default:
      ASSERT(0);
H
Hongze Cheng 已提交
566 567 568
  }

  tsem_wait(&fs->canEdit);
H
Hongze Cheng 已提交
569

H
Hongze Cheng 已提交
570
  fs->etype = etype;
H
Hongze Cheng 已提交
571

H
Hongze Cheng 已提交
572
  // edit
H
Hongze Cheng 已提交
573
  code = edit_fs(fs, opArray);
H
Hongze Cheng 已提交
574
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
575

H
Hongze Cheng 已提交
576
  // save fs
H
Hongze Cheng 已提交
577
  code = save_fs(fs->fSetArrTmp, current_t);
H
Hongze Cheng 已提交
578
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
579 580 581

_exit:
  if (code) {
H
Hongze Cheng 已提交
582
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
583
              tstrerror(code), etype);
H
Hongze Cheng 已提交
584
  } else {
H
Hongze Cheng 已提交
585
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
H
Hongze Cheng 已提交
586 587 588 589
  }
  return code;
}

H
Hongze Cheng 已提交
590
int32_t tsdbFSEditCommit(STFileSystem *fs) {
H
Hongze Cheng 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603
  int32_t code = 0;
  int32_t lino = 0;

  // commit
  code = commit_edit(fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (fs->etype == TSDB_FEDIT_MERGE) {
    ASSERT(fs->mergeTaskOn);
    fs->mergeTaskOn = false;
  }

  // check if need to merge
H
Hongze Cheng 已提交
604
  if (fs->tsdb->pVnode->config.sttTrigger > 1 && fs->mergeTaskOn == false) {
H
Hongze Cheng 已提交
605 606 607 608 609 610 611 612 613 614 615
    STFileSet *fset;
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
      if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;

      SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr);
      if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) == 0) continue;

      STFileObj *fobj = TARRAY2_FIRST(lvl0->fobjArr);

      if (fobj->f->stt->nseg < fs->tsdb->pVnode->config.sttTrigger) continue;

H
Hongze Cheng 已提交
616
      code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb);
H
Hongze Cheng 已提交
617 618 619 620 621 622 623 624 625 626 627 628 629 630
      TSDB_CHECK_CODE(code, lino, _exit);

      fs->mergeTaskOn = true;

      break;
    }
  }

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
  } else {
    tsem_post(&fs->canEdit);
  }
H
Hongze Cheng 已提交
631 632 633
  return code;
}

H
Hongze Cheng 已提交
634 635 636
int32_t tsdbFSEditAbort(STFileSystem *fs) {
  int32_t code = abort_edit(fs);
  tsem_post(&fs->canEdit);
H
Hongze Cheng 已提交
637
  return code;
H
Hongze Cheng 已提交
638 639
}

H
Hongze Cheng 已提交
640 641 642
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
  STFileSet  tfset = {.fid = fid};
  STFileSet *pset = &tfset;
H
Hongze Cheng 已提交
643
  fset[0] = TARRAY2_SEARCH_EX(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
644
  return 0;
H
Hongze Cheng 已提交
645 646
}

H
Hongze Cheng 已提交
647
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
H
Hongze Cheng 已提交
648 649 650 651
  int32_t    code = 0;
  STFileSet *fset;
  STFileSet *fset1;

H
Hongze Cheng 已提交
652 653 654 655
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
  if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  TARRAY2_INIT(fsetArr[0]);
H
Hongze Cheng 已提交
656 657

  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
H
Hongze Cheng 已提交
658
  TARRAY2_FOREACH(fs->fSetArr, fset) {
H
Hongze Cheng 已提交
659 660 661
    code = tsdbTFileSetInitEx(fs->tsdb, fset, &fset1);
    if (code) break;

H
Hongze Cheng 已提交
662
    code = TARRAY2_APPEND(fsetArr[0], fset1);
H
Hongze Cheng 已提交
663 664 665 666 667
    if (code) break;
  }
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);

  if (code) {
H
Hongze Cheng 已提交
668
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Hongze Cheng 已提交
669 670
    taosMemoryFree(fsetArr[0]);
    fsetArr[0] = NULL;
H
Hongze Cheng 已提交
671 672 673 674
  }
  return code;
}

H
Hongze Cheng 已提交
675 676
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
  if (fsetArr[0]) {
H
Hongze Cheng 已提交
677
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Hongze Cheng 已提交
678 679
    fsetArr[0] = NULL;
  }
H
Hongze Cheng 已提交
680
  return 0;
H
Hongze Cheng 已提交
681 682 683 684 685 686
}

int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
  int32_t    code = 0;
  STFileSet *fset, *fset1;

H
Hongze Cheng 已提交
687
  fsetArr[0] = taosMemoryMalloc(sizeof(*fsetArr[0]));
H
Hongze Cheng 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
  TARRAY2_FOREACH(fs->fSetArr, fset) {
    // TODO: create ref fset of fset1

    code = TARRAY2_APPEND(fsetArr[0], fset1);
    if (code) break;
  }
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);

  if (code) {
    TARRAY2_DESTROY(fsetArr[0], NULL /* TODO */);
    fsetArr[0] = NULL;
  }
  return code;
}

int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
  if (fsetArr[0]) {
    TARRAY2_DESTROY(fsetArr[0], NULL /* TODO */);
    fsetArr[0] = NULL;
  }
  return 0;
H
Hongze Cheng 已提交
712
}