tsdbRetention.c 2.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18 19 20 21 22
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;
23

24
    if (expLevel == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
25

26 27 28 29 30 31
    if (expLevel < 0) {
      return true;
    } else {
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        return false;
      }
32

33
      if (did.level == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
34

35
      return true;
H
Hongze Cheng 已提交
36
    }
H
Hongze Cheng 已提交
37 38
  }

39
  return false;
H
Hongze Cheng 已提交
40 41
}

42
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
H
Hongze Cheng 已提交
43 44
  int32_t code = 0;

45 46
  if (!tsdbShouldDoRetention(pTsdb, now)) {
    return code;
H
Hongze Cheng 已提交
47
  }
H
Hongze Cheng 已提交
48

49 50
  // do retention
  STsdbFS fs;
H
Hongze Cheng 已提交
51 52

  code = tsdbFSCopy(pTsdb, &fs);
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
  if (code) goto _err;

  for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;

    if (expLevel < 0) {
      taosMemoryFree(pSet->pHeadF);
      taosMemoryFree(pSet->pDataF);
      taosMemoryFree(pSet->aSttF[0]);
      taosMemoryFree(pSet->pSmaF);
      taosArrayRemove(fs.aDFileSet, iSet);
      iSet--;
    } else {
      if (expLevel == 0) continue;
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        code = terrno;
        goto _exit;
72
      }
H
Hongze Cheng 已提交
73

74
      if (did.level == pSet->diskId.level) continue;
75

76 77 78
      // copy file to new disk (todo)
      SDFileSet fSet = *pSet;
      fSet.diskId = did;
H
Hongze Cheng 已提交
79

80 81
      code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
      if (code) goto _err;
82

83 84
      code = tsdbFSUpsertFSet(&fs, &fSet);
      if (code) goto _err;
85 86 87
    }
  }

88 89 90
  // do change fs
  code = tsdbFSCommit1(pTsdb, &fs);
  if (code) goto _err;
91

92
  taosThreadRwlockWrlock(&pTsdb->rwLock);
93

94 95
  code = tsdbFSCommit2(pTsdb, &fs);
  if (code) {
96
    taosThreadRwlockUnlock(&pTsdb->rwLock);
97
    goto _err;
98
  }
99

H
Hongze Cheng 已提交
100 101
  taosThreadRwlockUnlock(&pTsdb->rwLock);

102
  tsdbFSDestroy(&fs);
H
Hongze Cheng 已提交
103

H
Hongze Cheng 已提交
104 105
_exit:
  return code;
106

107 108 109 110
_err:
  tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  ASSERT(0);
  // tsdbFSRollback(pTsdb->pFS);
H
Hongze Cheng 已提交
111 112
  return code;
}