tsdbRetention.c 3.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18
static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) {
19 20 21 22
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;
23

24
    if (expLevel == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
25

26 27 28 29 30 31
    if (expLevel < 0) {
      return true;
    } else {
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        return false;
      }
32

33
      if (did.level == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
34

35
      return true;
H
Hongze Cheng 已提交
36
    }
H
Hongze Cheng 已提交
37 38
  }

39
  return false;
H
Hongze Cheng 已提交
40
}
H
Hongze Cheng 已提交
41 42 43 44 45 46 47
bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
  bool should;
  taosThreadRwlockRdlock(&pTsdb->rwLock);
  should = tsdbShouldDoRetentionImpl(pTsdb, now);
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return should;
}
H
Hongze Cheng 已提交
48

49
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
H
Hongze Cheng 已提交
50
  int32_t code = 0;
H
Hongze Cheng 已提交
51
  int32_t lino = 0;
H
Hongze Cheng 已提交
52
  STsdbFS fs = {0};
H
Hongze Cheng 已提交
53 54

  code = tsdbFSCopy(pTsdb, &fs);
H
Hongze Cheng 已提交
55
  TSDB_CHECK_CODE(code, lino, _exit);
56 57 58 59 60 61 62 63 64 65

  for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;

    if (expLevel < 0) {
      taosMemoryFree(pSet->pHeadF);
      taosMemoryFree(pSet->pDataF);
      taosMemoryFree(pSet->pSmaF);
H
Hongze Cheng 已提交
66 67 68
      for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
        taosMemoryFree(pSet->aSttF[iStt]);
      }
69 70 71 72 73 74 75
      taosArrayRemove(fs.aDFileSet, iSet);
      iSet--;
    } else {
      if (expLevel == 0) continue;
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        code = terrno;
        goto _exit;
76
      }
H
Hongze Cheng 已提交
77

78
      if (did.level == pSet->diskId.level) continue;
79

80 81 82
      // copy file to new disk (todo)
      SDFileSet fSet = *pSet;
      fSet.diskId = did;
H
Hongze Cheng 已提交
83

84
      code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
H
Hongze Cheng 已提交
85
      TSDB_CHECK_CODE(code, lino, _exit);
86

87
      code = tsdbFSUpsertFSet(&fs, &fSet);
H
Hongze Cheng 已提交
88
      TSDB_CHECK_CODE(code, lino, _exit);
89 90 91
    }
  }

92
  // do change fs
H
Hongze Cheng 已提交
93
  code = tsdbFSPrepareCommit(pTsdb, &fs);
H
Hongze Cheng 已提交
94
  TSDB_CHECK_CODE(code, lino, _exit);
95

H
Hongze Cheng 已提交
96
_exit:
97
  if (code) {
H
Hongze Cheng 已提交
98 99 100
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
101
  }
102
  tsdbFSDestroy(&fs);
H
Hongze Cheng 已提交
103
  return code;
H
Hongze Cheng 已提交
104
}
105

H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113
static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); }

int32_t tsdbCommitRetention(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  tsdbCommitRetentionImpl(pTsdb);
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  return 0;
H
Hongze Cheng 已提交
114
}