tsdbFS2.c 23.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14 15
 */

H
Hongze Cheng 已提交
16
#include "tsdbFS2.h"
H
Hongze Cheng 已提交
17
#include "tsdbUpgrade.h"
18
#include "vnd.h"
H
Hongze Cheng 已提交
19

H
Hongze Cheng 已提交
20
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
H
Hongze Cheng 已提交
21
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23 24
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
H
Hongze Cheng 已提交
25 26 27 28 29 30 31 32

enum {
  TSDB_FS_STATE_NONE = 0,
  TSDB_FS_STATE_OPEN,
  TSDB_FS_STATE_EDIT,
  TSDB_FS_STATE_CLOSE,
};

H
Hongze Cheng 已提交
33 34
static const char *gCurrentFname[] = {
    [TSDB_FCURRENT] = "current.json",
H
Hongze Cheng 已提交
35 36
    [TSDB_FCURRENT_C] = "current.c.json",
    [TSDB_FCURRENT_M] = "current.m.json",
H
Hongze Cheng 已提交
37 38
};

H
Hongze Cheng 已提交
39 40 41 42
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
  if (fs[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

H
Hongze Cheng 已提交
43
  fs[0]->tsdb = pTsdb;
H
Hongze Cheng 已提交
44
  tsem_init(&fs[0]->canEdit, 0, 1);
H
Hongze Cheng 已提交
45
  fs[0]->state = TSDB_FS_STATE_NONE;
H
Hongze Cheng 已提交
46
  fs[0]->neid = 0;
H
Hongze Cheng 已提交
47 48
  TARRAY2_INIT(fs[0]->fSetArr);
  TARRAY2_INIT(fs[0]->fSetArrTmp);
H
Hongze Cheng 已提交
49

H
Hongze Cheng 已提交
50 51 52 53 54
  // background task queue
  taosThreadMutexInit(fs[0]->mutex, NULL);
  fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue;
  fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue;

H
Hongze Cheng 已提交
55 56 57
  return 0;
}

H
Hongze Cheng 已提交
58 59
static int32_t destroy_fs(STFileSystem **fs) {
  if (fs[0] == NULL) return 0;
H
Hongze Cheng 已提交
60 61 62 63
  taosThreadMutexDestroy(fs[0]->mutex);

  ASSERT(fs[0]->bgTaskNum == 0);

H
Hongze Cheng 已提交
64 65
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
H
Hongze Cheng 已提交
66 67 68
  tsem_destroy(&fs[0]->canEdit);
  taosMemoryFree(fs[0]);
  fs[0] = NULL;
H
Hongze Cheng 已提交
69 70 71
  return 0;
}

H
Hongze Cheng 已提交
72
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
73 74
  int32_t offset = 0;

75
  vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
76 77 78
  offset = strlen(fname);
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]);

H
Hongze Cheng 已提交
79 80 81
  return 0;
}

H
Hongze Cheng 已提交
82
static int32_t save_json(const cJSON *json, const char *fname) {
H
Hongze Cheng 已提交
83
  int32_t code = 0;
H
Hongze Cheng 已提交
84

H
Hongze Cheng 已提交
85
  char *data = cJSON_PrintUnformatted(json);
H
Hongze Cheng 已提交
86 87 88 89 90 91
  if (data == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  TdFilePtr fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (fp == NULL) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
H
Hongze Cheng 已提交
92 93
  }

H
Hongze Cheng 已提交
94
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
H
Hongze Cheng 已提交
95 96
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
H
Hongze Cheng 已提交
97 98
  }

H
Hongze Cheng 已提交
99 100 101 102
  if (taosFsyncFile(fp) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }
H
Hongze Cheng 已提交
103

H
Hongze Cheng 已提交
104
  taosCloseFile(&fp);
H
Hongze Cheng 已提交
105 106

_exit:
H
Hongze Cheng 已提交
107
  taosMemoryFree(data);
H
Hongze Cheng 已提交
108 109 110
  return code;
}

H
Hongze Cheng 已提交
111 112
static int32_t load_json(const char *fname, cJSON **json) {
  int32_t code = 0;
H
Hongze Cheng 已提交
113
  char   *data = NULL;
H
Hongze Cheng 已提交
114 115 116 117 118 119 120 121 122 123

  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
  if (fp == NULL) return TAOS_SYSTEM_ERROR(code);

  int64_t size;
  if (taosFStatFile(fp, &size, NULL) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }

H
Hongze Cheng 已提交
124
  data = taosMemoryMalloc(size + 1);
H
Hongze Cheng 已提交
125 126 127 128 129 130 131 132 133
  if (data == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  if (taosReadFile(fp, data, size) < 0) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _exit;
  }
H
Hongze Cheng 已提交
134
  data[size] = '\0';
H
Hongze Cheng 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148

  json[0] = cJSON_Parse(data);
  if (json[0] == NULL) {
    code = TSDB_CODE_FILE_CORRUPTED;
    goto _exit;
  }

_exit:
  taosCloseFile(&fp);
  if (data) taosMemoryFree(data);
  if (code) json[0] = NULL;
  return code;
}

H
Hongze Cheng 已提交
149
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
H
Hongze Cheng 已提交
150
  int32_t code = 0;
H
Hongze Cheng 已提交
151
  int32_t lino = 0;
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153
  cJSON *json = cJSON_CreateObject();
H
Hongze Cheng 已提交
154
  if (!json) return TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
155

H
Hongze Cheng 已提交
156 157 158
  // fmtv
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
159
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
160 161
  }

H
Hongze Cheng 已提交
162 163
  // fset
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
H
Hongze Cheng 已提交
164
  if (!ajson) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
H
Hongze Cheng 已提交
165 166 167
  const STFileSet *fset;
  TARRAY2_FOREACH(arr, fset) {
    cJSON *item = cJSON_CreateObject();
H
Hongze Cheng 已提交
168 169
    if (!item) TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
    cJSON_AddItemToArray(ajson, item);
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
    code = tsdbTFileSetToJson(fset, item);
H
Hongze Cheng 已提交
172 173 174
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
175
  code = save_json(json, fname);
H
Hongze Cheng 已提交
176
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
177 178 179

_exit:
  if (code) {
H
Hongze Cheng 已提交
180
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
181
  }
H
Hongze Cheng 已提交
182
  cJSON_Delete(json);
H
Hongze Cheng 已提交
183
  return code;
H
Hongze Cheng 已提交
184 185
}

H
Hongze Cheng 已提交
186
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
H
Hongze Cheng 已提交
187 188 189
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
190
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
H
Hongze Cheng 已提交
191 192 193 194

  // load json
  cJSON *json = NULL;
  code = load_json(fname, &json);
H
Hongze Cheng 已提交
195
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
196 197

  // parse json
H
Hongze Cheng 已提交
198
  const cJSON *item1;
H
Hongze Cheng 已提交
199 200

  /* fmtv */
H
Hongze Cheng 已提交
201 202 203
  item1 = cJSON_GetObjectItem(json, "fmtv");
  if (cJSON_IsNumber(item1)) {
    ASSERT(item1->valuedouble == 1);
H
Hongze Cheng 已提交
204
  } else {
H
Hongze Cheng 已提交
205
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
H
Hongze Cheng 已提交
206 207 208
  }

  /* fset */
H
Hongze Cheng 已提交
209 210 211 212
  item1 = cJSON_GetObjectItem(json, "fset");
  if (cJSON_IsArray(item1)) {
    const cJSON *item2;
    cJSON_ArrayForEach(item2, item1) {
H
Hongze Cheng 已提交
213
      STFileSet *fset;
H
Hongze Cheng 已提交
214
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
H
Hongze Cheng 已提交
215
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
      code = TARRAY2_APPEND(arr, fset);
H
Hongze Cheng 已提交
218
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
219 220
    }
  } else {
H
Hongze Cheng 已提交
221
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
H
Hongze Cheng 已提交
222 223 224 225 226 227 228 229
  }

_exit:
  if (code) {
    tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
  }
  if (json) cJSON_Delete(json);
  return code;
H
Hongze Cheng 已提交
230 231
}

H
Hongze Cheng 已提交
232 233 234 235 236 237 238 239
static bool is_same_file(const STFile *f1, const STFile f2) {
  if (f1->type != f2.type) return false;
  if (f1->did.level != f2.did.level) return false;
  if (f1->did.id != f2.did.id) return false;
  if (f1->cid != f2.cid) return false;
  return true;
}

H
Hongze Cheng 已提交
240
static int32_t apply_commit(STFileSystem *fs) {
H
Hongze Cheng 已提交
241
  int32_t        code = 0;
H
Hongze Cheng 已提交
242 243
  TFileSetArray *fsetArray1 = fs->fSetArr;
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
H
Hongze Cheng 已提交
244
  int32_t        i1 = 0, i2 = 0;
H
Hongze Cheng 已提交
245

H
Hongze Cheng 已提交
246
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
H
Hongze Cheng 已提交
247 248
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
H
Hongze Cheng 已提交
249 250 251

    if (fset1 && fset2) {
      if (fset1->fid < fset2->fid) {
H
Hongze Cheng 已提交
252 253
        // delete fset1
        TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
H
Hongze Cheng 已提交
254 255
      } else if (fset1->fid > fset2->fid) {
        // create new file set with fid of fset2->fid
H
Hongze Cheng 已提交
256
        code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1);
H
Hongze Cheng 已提交
257
        if (code) return code;
H
Hongze Cheng 已提交
258
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
259 260
        if (code) return code;
        i1++;
H
Hongze Cheng 已提交
261
        i2++;
H
Hongze Cheng 已提交
262 263
      } else {
        // edit
H
Hongze Cheng 已提交
264
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
H
Hongze Cheng 已提交
265 266 267 268 269
        if (code) return code;
        i1++;
        i2++;
      }
    } else if (fset1) {
H
Hongze Cheng 已提交
270 271
      // delete fset1
      TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
H
Hongze Cheng 已提交
272 273
    } else {
      // create new file set with fid of fset2->fid
H
Hongze Cheng 已提交
274
      code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1);
H
Hongze Cheng 已提交
275
      if (code) return code;
H
Hongze Cheng 已提交
276
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
277 278
      if (code) return code;
      i1++;
H
Hongze Cheng 已提交
279
      i2++;
H
Hongze Cheng 已提交
280 281
    }
  }
H
Hongze Cheng 已提交
282

H
Hongze Cheng 已提交
283 284 285 286
  return 0;
}

static int32_t commit_edit(STFileSystem *fs) {
H
Hongze Cheng 已提交
287 288
  char current[TSDB_FILENAME_LEN];
  char current_t[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
H
Hongze Cheng 已提交
291
  if (fs->etype == TSDB_FEDIT_COMMIT) {
H
Hongze Cheng 已提交
292
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
293
  } else if (fs->etype == TSDB_FEDIT_MERGE) {
H
Hongze Cheng 已提交
294
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
295 296
  } else {
    ASSERT(0);
H
Hongze Cheng 已提交
297 298
  }

H
Hongze Cheng 已提交
299 300 301
  int32_t code;
  int32_t lino;
  if ((code = taosRenameFile(current_t, current))) {
H
Hongze Cheng 已提交
302
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
H
Hongze Cheng 已提交
303
  }
H
Hongze Cheng 已提交
304

H
Hongze Cheng 已提交
305
  code = apply_commit(fs);
H
Hongze Cheng 已提交
306
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
307

H
Hongze Cheng 已提交
308 309
_exit:
  if (code) {
H
Hongze Cheng 已提交
310
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
311
  } else {
H
Hongze Cheng 已提交
312
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
H
Hongze Cheng 已提交
313 314
  }
  return code;
H
Hongze Cheng 已提交
315 316
}

H
Hongze Cheng 已提交
317 318 319 320 321 322 323
// static int32_t
static int32_t apply_abort(STFileSystem *fs) {
  // TODO
  return 0;
}

static int32_t abort_edit(STFileSystem *fs) {
H
Hongze Cheng 已提交
324
  char fname[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326
  if (fs->etype == TSDB_FEDIT_COMMIT) {
H
Hongze Cheng 已提交
327
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
328
  } else if (fs->etype == TSDB_FEDIT_MERGE) {
H
Hongze Cheng 已提交
329
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
330 331 332
  } else {
    ASSERT(0);
  }
H
Hongze Cheng 已提交
333

H
Hongze Cheng 已提交
334 335 336
  int32_t code;
  int32_t lino;
  if ((code = taosRemoveFile(fname))) {
H
Hongze Cheng 已提交
337
    TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
H
Hongze Cheng 已提交
338
  }
H
Hongze Cheng 已提交
339

H
Hongze Cheng 已提交
340
  code = apply_abort(fs);
H
Hongze Cheng 已提交
341
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
342 343 344

_exit:
  if (code) {
H
Hongze Cheng 已提交
345
    tsdbError("vgId:%d %s failed since %s", TD_VID(fs->tsdb->pVnode), __func__, tstrerror(code));
H
Hongze Cheng 已提交
346
  } else {
H
Hongze Cheng 已提交
347
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
H
Hongze Cheng 已提交
348 349
  }
  return code;
H
Hongze Cheng 已提交
350 351
}

H
Hongze Cheng 已提交
352 353 354 355 356
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
  fs->neid = 0;

  // get max commit id
  const STFileSet *fset;
H
Hongze Cheng 已提交
357
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
H
Hongze Cheng 已提交
358

H
Hongze Cheng 已提交
359
  // TODO
H
Hongze Cheng 已提交
360 361 362
  return 0;
}

H
Hongze Cheng 已提交
363 364 365
static int32_t tsdbFSDupState(STFileSystem *fs) {
  int32_t code;

H
Hongze Cheng 已提交
366 367
  const TFileSetArray *src = fs->fSetArr;
  TFileSetArray       *dst = fs->fSetArrTmp;
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370 371 372
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);

  const STFileSet *fset1;
  TARRAY2_FOREACH(src, fset1) {
H
Hongze Cheng 已提交
373
    STFileSet *fset2;
H
Hongze Cheng 已提交
374
    code = tsdbTFileSetInitDup(fs->tsdb, fset1, &fset2);
H
Hongze Cheng 已提交
375
    if (code) return code;
H
Hongze Cheng 已提交
376
    code = TARRAY2_APPEND(dst, fset2);
H
Hongze Cheng 已提交
377 378 379 380 381 382
    if (code) return code;
  }

  return 0;
}

H
Hongze Cheng 已提交
383
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
H
Hongze Cheng 已提交
384
  int32_t code = 0;
H
Hongze Cheng 已提交
385
  int32_t lino = 0;
H
Hongze Cheng 已提交
386
  STsdb  *pTsdb = fs->tsdb;
H
Hongze Cheng 已提交
387

H
Hongze Cheng 已提交
388 389 390
  char fCurrent[TSDB_FILENAME_LEN];
  char cCurrent[TSDB_FILENAME_LEN];
  char mCurrent[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
391

H
Hongze Cheng 已提交
392 393
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
394
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
395

H
Hongze Cheng 已提交
396
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
H
Hongze Cheng 已提交
397
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
H
Hongze Cheng 已提交
398
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400
    if (taosCheckExistFile(cCurrent)) {
H
Hongze Cheng 已提交
401 402 403
      // current.c.json exists

      fs->etype = TSDB_FEDIT_COMMIT;
H
Hongze Cheng 已提交
404
      if (rollback) {
H
Hongze Cheng 已提交
405
        code = abort_edit(fs);
H
Hongze Cheng 已提交
406 407
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
H
Hongze Cheng 已提交
408
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
H
Hongze Cheng 已提交
409
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
410 411

        code = commit_edit(fs);
H
Hongze Cheng 已提交
412 413
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
414 415 416 417
    } else if (taosCheckExistFile(mCurrent)) {
      // current.m.json exists
      fs->etype = TSDB_FEDIT_MERGE;
      code = abort_edit(fs);
H
Hongze Cheng 已提交
418 419
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
420

H
Hongze Cheng 已提交
421
    code = tsdbFSDupState(fs);
H
Hongze Cheng 已提交
422 423
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
424
    code = tsdbFSScanAndFix(fs);
H
Hongze Cheng 已提交
425
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
426
  } else {
H
Hongze Cheng 已提交
427
    code = save_fs(fs->fSetArr, fCurrent);
H
Hongze Cheng 已提交
428
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
429 430 431 432
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
433
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
434
  } else {
H
Hongze Cheng 已提交
435
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
436
  }
H
Hongze Cheng 已提交
437 438 439
  return 0;
}

H
Hongze Cheng 已提交
440
static int32_t close_file_system(STFileSystem *fs) {
H
Hongze Cheng 已提交
441 442
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
H
Hongze Cheng 已提交
443
  // TODO
H
Hongze Cheng 已提交
444 445
  return 0;
}
H
Hongze Cheng 已提交
446

H
Hongze Cheng 已提交
447
static int32_t apply_edit(STFileSystem *pFS) {
H
Hongze Cheng 已提交
448
  int32_t code = 0;
H
Hongze Cheng 已提交
449
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
450
  return code;
H
Hongze Cheng 已提交
451 452
}

H
Hongze Cheng 已提交
453
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
H
Hongze Cheng 已提交
454 455 456 457 458 459 460 461
  if (pSet1->fid < pSet2->fid) {
    return -1;
  } else if (pSet1->fid > pSet2->fid) {
    return 1;
  }
  return 0;
}

H
Hongze Cheng 已提交
462
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
H
more  
Hongze Cheng 已提交
463 464 465 466 467
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSDupState(fs);
  if (code) return code;
H
Hongze Cheng 已提交
468

H
more  
Hongze Cheng 已提交
469
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
H
Hongze Cheng 已提交
470 471
  STFileSet      *fset = NULL;
  const STFileOp *op;
H
Hongze Cheng 已提交
472
  TARRAY2_FOREACH_PTR(opArray, op) {
H
Hongze Cheng 已提交
473 474 475
    if (!fset || fset->fid != op->fid) {
      STFileSet tfset = {.fid = op->fid};
      fset = &tfset;
H
Hongze Cheng 已提交
476 477
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
H
Hongze Cheng 已提交
478 479 480 481 482

      if (!fset) {
        code = tsdbTFileSetInit(op->fid, &fset);
        TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
483
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
H
Hongze Cheng 已提交
484 485
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
486 487
    }

H
Hongze Cheng 已提交
488
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
H
Hongze Cheng 已提交
489
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
490
  }
H
Hongze Cheng 已提交
491

H
Hongze Cheng 已提交
492 493 494 495 496 497 498 499 500
  // remove empty file set
  int32_t i = 0;
  while (i < TARRAY2_SIZE(fsetArray)) {
    fset = TARRAY2_GET(fsetArray, i);
    if (tsdbTFileSetIsEmpty(fset)) {
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
    } else {
      i++;
    }
H
Hongze Cheng 已提交
501 502
  }

H
Hongze Cheng 已提交
503
_exit:
H
Hongze Cheng 已提交
504 505 506
  if (code) {
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
  }
H
Hongze Cheng 已提交
507
  return code;
H
Hongze Cheng 已提交
508 509
}

H
Hongze Cheng 已提交
510
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
H
Hongze Cheng 已提交
511 512 513
  int32_t code;
  int32_t lino;

H
Hongze Cheng 已提交
514 515 516
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
517
  code = create_fs(pTsdb, fs);
H
Hongze Cheng 已提交
518 519
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
520
  code = open_fs(fs[0], rollback);
H
Hongze Cheng 已提交
521
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
522 523 524

_exit:
  if (code) {
H
Hongze Cheng 已提交
525
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
526
    destroy_fs(fs);
H
Hongze Cheng 已提交
527
  } else {
H
Hongze Cheng 已提交
528
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
529 530 531 532
  }
  return 0;
}

H
Hongze Cheng 已提交
533 534 535 536 537 538 539
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
  task->numWait++;
  taosThreadCondWait(task->done, fs->mutex);
  task->numWait--;

  if (task->numWait == 0) {
    taosThreadCondDestroy(task->done);
H
Hongze Cheng 已提交
540 541 542
    if (task->free) {
      task->free(task->arg);
    }
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551
    taosMemoryFree(task);
  }
}

static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
  if (task->numWait > 0) {
    taosThreadCondBroadcast(task->done);
  } else {
    taosThreadCondDestroy(task->done);
H
Hongze Cheng 已提交
552 553 554
    if (task->free) {
      task->free(task->arg);
    }
H
Hongze Cheng 已提交
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
    taosMemoryFree(task);
  }
}

int32_t tsdbCloseFS(STFileSystem **fs) {
  if (fs[0] == NULL) return 0;

  taosThreadMutexLock(fs[0]->mutex);
  fs[0]->stop = true;

  if (fs[0]->bgTaskRunning) {
    tsdbDoWaitBgTask(fs[0], fs[0]->bgTaskRunning);
  }
  taosThreadMutexUnlock(fs[0]->mutex);

  close_file_system(fs[0]);
  destroy_fs(fs);
H
Hongze Cheng 已提交
572 573
  return 0;
}
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575 576 577 578 579
int64_t tsdbFSAllocEid(STFileSystem *fs) {
  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
  int64_t cid = ++fs->neid;
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);
  return cid;
H
Hongze Cheng 已提交
580 581
}

H
Hongze Cheng 已提交
582
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
H
Hongze Cheng 已提交
583
  int32_t code = 0;
H
Hongze Cheng 已提交
584
  int32_t lino;
H
Hongze Cheng 已提交
585
  char    current_t[TSDB_FILENAME_LEN];
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587 588
  switch (etype) {
    case TSDB_FEDIT_COMMIT:
H
Hongze Cheng 已提交
589
      current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
H
Hongze Cheng 已提交
590 591
      break;
    case TSDB_FEDIT_MERGE:
H
Hongze Cheng 已提交
592
      current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
H
Hongze Cheng 已提交
593 594 595
      break;
    default:
      ASSERT(0);
H
Hongze Cheng 已提交
596 597 598 599
  }

  tsem_wait(&fs->canEdit);
  fs->etype = etype;
H
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601
  // edit
H
Hongze Cheng 已提交
602
  code = edit_fs(fs, opArray);
H
Hongze Cheng 已提交
603
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
604

H
Hongze Cheng 已提交
605
  // save fs
H
Hongze Cheng 已提交
606
  code = save_fs(fs->fSetArrTmp, current_t);
H
Hongze Cheng 已提交
607
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
608 609 610

_exit:
  if (code) {
H
Hongze Cheng 已提交
611
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
612
              tstrerror(code), etype);
H
Hongze Cheng 已提交
613
  } else {
H
Hongze Cheng 已提交
614
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
H
Hongze Cheng 已提交
615 616 617 618
  }
  return code;
}

H
Hongze Cheng 已提交
619
int32_t tsdbFSEditCommit(STFileSystem *fs) {
H
Hongze Cheng 已提交
620 621 622 623 624 625 626
  int32_t code = 0;
  int32_t lino = 0;

  // commit
  code = commit_edit(fs);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
627 628
  // schedule merge
  if (fs->tsdb->pVnode->config.sttTrigger != 1) {
H
Hongze Cheng 已提交
629 630 631 632
    STFileSet *fset;
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
      if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;

H
Hongze Cheng 已提交
633 634
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
      if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue;
H
Hongze Cheng 已提交
635

H
Hongze Cheng 已提交
636
      code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL);
H
Hongze Cheng 已提交
637 638 639 640 641 642 643 644 645 646
      TSDB_CHECK_CODE(code, lino, _exit);

      break;
    }
  }

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
  } else {
H
Hongze Cheng 已提交
647
    tsdbDebug("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
H
Hongze Cheng 已提交
648 649
    tsem_post(&fs->canEdit);
  }
H
Hongze Cheng 已提交
650 651 652
  return code;
}

H
Hongze Cheng 已提交
653 654 655
int32_t tsdbFSEditAbort(STFileSystem *fs) {
  int32_t code = abort_edit(fs);
  tsem_post(&fs->canEdit);
H
Hongze Cheng 已提交
656
  return code;
H
Hongze Cheng 已提交
657 658
}

H
Hongze Cheng 已提交
659
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
H
Hongze Cheng 已提交
660 661 662 663
  STFileSet   tfset = {.fid = fid};
  STFileSet  *pset = &tfset;
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
H
Hongze Cheng 已提交
664
  return 0;
H
Hongze Cheng 已提交
665 666
}

H
Hongze Cheng 已提交
667
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
H
Hongze Cheng 已提交
668 669 670 671
  int32_t    code = 0;
  STFileSet *fset;
  STFileSet *fset1;

H
Hongze Cheng 已提交
672 673 674 675
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
  if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  TARRAY2_INIT(fsetArr[0]);
H
Hongze Cheng 已提交
676 677

  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
H
Hongze Cheng 已提交
678
  TARRAY2_FOREACH(fs->fSetArr, fset) {
H
Hongze Cheng 已提交
679
    code = tsdbTFileSetInitDup(fs->tsdb, fset, &fset1);
H
Hongze Cheng 已提交
680 681
    if (code) break;

H
Hongze Cheng 已提交
682
    code = TARRAY2_APPEND(fsetArr[0], fset1);
H
Hongze Cheng 已提交
683 684 685 686 687
    if (code) break;
  }
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);

  if (code) {
H
Hongze Cheng 已提交
688
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Hongze Cheng 已提交
689 690
    taosMemoryFree(fsetArr[0]);
    fsetArr[0] = NULL;
H
Hongze Cheng 已提交
691 692 693 694
  }
  return code;
}

H
Hongze Cheng 已提交
695 696
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
  if (fsetArr[0]) {
H
Hongze Cheng 已提交
697
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Hongze Cheng 已提交
698
    taosMemoryFree(fsetArr[0]);
H
Hongze Cheng 已提交
699 700
    fsetArr[0] = NULL;
  }
H
Hongze Cheng 已提交
701
  return 0;
H
Hongze Cheng 已提交
702 703 704 705 706 707
}

int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
  int32_t    code = 0;
  STFileSet *fset, *fset1;

708
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
H
Hongze Cheng 已提交
709 710 711 712
  if (fsetArr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  taosThreadRwlockRdlock(&fs->tsdb->rwLock);
  TARRAY2_FOREACH(fs->fSetArr, fset) {
H
Hongze Cheng 已提交
713 714
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
    if (code) break;
H
Hongze Cheng 已提交
715 716 717 718 719 720 721

    code = TARRAY2_APPEND(fsetArr[0], fset1);
    if (code) break;
  }
  taosThreadRwlockUnlock(&fs->tsdb->rwLock);

  if (code) {
H
Hongze Cheng 已提交
722
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Hongze Cheng 已提交
723 724 725 726 727 728 729
    fsetArr[0] = NULL;
  }
  return code;
}

int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
  if (fsetArr[0]) {
H
Hongze Cheng 已提交
730
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
H
Haojun Liao 已提交
731
    taosMemoryFreeClear(fsetArr[0]);
H
Hongze Cheng 已提交
732 733 734
    fsetArr[0] = NULL;
  }
  return 0;
H
Hongze Cheng 已提交
735 736 737 738 739 740 741 742 743 744 745 746 747
}

const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};

static int32_t tsdbFSRunBgTask(void *arg) {
  STFileSystem *fs = (STFileSystem *)arg;

  ASSERT(fs->bgTaskRunning != NULL);

  fs->bgTaskRunning->launchTime = taosGetTimestampMs();
  fs->bgTaskRunning->run(fs->bgTaskRunning->arg);
  fs->bgTaskRunning->finishTime = taosGetTimestampMs();

H
Hongze Cheng 已提交
748 749 750 751
  tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64
            " finish time:%" PRId64,
            TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->taskid,
            fs->bgTaskRunning->scheduleTime, fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime);
H
Hongze Cheng 已提交
752 753 754 755

  taosThreadMutexLock(fs->mutex);

  // free last
H
Hongze Cheng 已提交
756
  tsdbDoDoneBgTask(fs, fs->bgTaskRunning);
H
Hongze Cheng 已提交
757 758 759 760
  fs->bgTaskRunning = NULL;

  // schedule next
  if (fs->bgTaskNum > 0) {
H
Hongze Cheng 已提交
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
    if (fs->stop) {
      while (fs->bgTaskNum > 0) {
        STFSBgTask *task = fs->bgTaskQueue->next;
        task->prev->next = task->next;
        task->next->prev = task->prev;
        fs->bgTaskNum--;
        tsdbDoDoneBgTask(fs, task);
      }
    } else {
      // pop task from head
      fs->bgTaskRunning = fs->bgTaskQueue->next;
      fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next;
      fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev;
      fs->bgTaskNum--;
      vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg);
    }
H
Hongze Cheng 已提交
777 778 779 780 781 782
  }

  taosThreadMutexUnlock(fs->mutex);
  return 0;
}

H
Hongze Cheng 已提交
783 784
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *),
                                        void *arg, int64_t *taskid) {
H
Hongze Cheng 已提交
785 786 787 788
  if (fs->stop) {
    return 0;  // TODO: use a better error code
  }

H
Hongze Cheng 已提交
789
  // check if same task is on
H
Hongze Cheng 已提交
790 791 792
  // if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) {
  //   return 0;
  // }
H
Hongze Cheng 已提交
793 794 795 796 797 798 799 800 801 802 803 804 805 806

  for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) {
    if (task->type == type) {
      return 0;
    }
  }

  // do schedule task
  STFSBgTask *task = taosMemoryCalloc(1, sizeof(STFSBgTask));
  if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY;
  taosThreadCondInit(task->done, NULL);

  task->type = type;
  task->run = run;
H
Hongze Cheng 已提交
807
  task->free = free;
H
Hongze Cheng 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
  task->arg = arg;
  task->scheduleTime = taosGetTimestampMs();
  task->taskid = ++fs->taskid;

  if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) {
    // launch task directly
    fs->bgTaskRunning = task;
    vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs);
  } else {
    // add to the queue tail
    fs->bgTaskNum++;
    task->next = fs->bgTaskQueue;
    task->prev = fs->bgTaskQueue->prev;
    task->prev->next = task;
    task->next->prev = task;
  }

  if (taskid) *taskid = task->taskid;
  return 0;
}

H
Hongze Cheng 已提交
829 830
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
                             int64_t *taskid) {
H
Hongze Cheng 已提交
831
  taosThreadMutexLock(fs->mutex);
H
Hongze Cheng 已提交
832
  int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid);
H
Hongze Cheng 已提交
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853
  taosThreadMutexUnlock(fs->mutex);
  return code;
}

int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) {
  STFSBgTask *task = NULL;

  taosThreadMutexLock(fs->mutex);

  if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) {
    task = fs->bgTaskRunning;
  } else {
    for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) {
      if (taskt->taskid == taskid) {
        task = taskt;
        break;
      }
    }
  }

  if (task) {
H
Hongze Cheng 已提交
854
    tsdbDoWaitBgTask(fs, task);
H
Hongze Cheng 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
  }

  taosThreadMutexUnlock(fs->mutex);
  return 0;
}

int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) {
  taosThreadMutexLock(fs->mutex);

  while (fs->bgTaskRunning) {
    taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex);
  }

  taosThreadMutexUnlock(fs->mutex);
  return 0;
H
Hongze Cheng 已提交
870 871 872
}

static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) {
H
Hongze Cheng 已提交
873 874 875 876 877
  fs->stop = true;

  if (fs->bgTaskRunning) {
    tsdbDoWaitBgTask(fs, fs->bgTaskRunning);
  }
H
Hongze Cheng 已提交
878 879 880 881 882 883 884 885 886 887 888
  return 0;
}

int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
  taosThreadMutexLock(fs->mutex);
  int32_t code = tsdbFSDoDisableBgTask(fs);
  taosThreadMutexUnlock(fs->mutex);
  return code;
}

int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
H
Hongze Cheng 已提交
889 890 891
  taosThreadMutexLock(fs->mutex);
  fs->stop = false;
  taosThreadMutexUnlock(fs->mutex);
H
Hongze Cheng 已提交
892
  return 0;
893
}