tsdbRetention.c 2.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
C
Cary Xu 已提交
21
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
H
Hongze Cheng 已提交
22
    SDiskID    did;
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24
    if (expLevel == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
25 26

    if (expLevel < 0) {
H
Hongze Cheng 已提交
27
      return true;
H
Hongze Cheng 已提交
28 29
    } else {
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
H
Hongze Cheng 已提交
30
        return false;
H
Hongze Cheng 已提交
31 32
      }

H
Hongze Cheng 已提交
33
      if (did.level == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
34

H
Hongze Cheng 已提交
35
      return true;
H
Hongze Cheng 已提交
36
    }
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39
  return false;
H
Hongze Cheng 已提交
40 41 42 43 44
}

int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
  int32_t code = 0;

H
Hongze Cheng 已提交
45 46 47
  if (!tsdbShouldDoRetention(pTsdb, now)) {
    return code;
  }
H
Hongze Cheng 已提交
48 49

  // do retention
H
Hongze Cheng 已提交
50 51 52
  STsdbFS fs;

  code = tsdbFSCopy(pTsdb, &fs);
H
Hongze Cheng 已提交
53 54
  if (code) goto _err;

H
Hongze Cheng 已提交
55
  for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
56
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
H
Hongze Cheng 已提交
57 58 59 60 61 62
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;

    if (expLevel < 0) {
      taosMemoryFree(pSet->pHeadF);
      taosMemoryFree(pSet->pDataF);
H
Hongze Cheng 已提交
63
      taosMemoryFree(pSet->aSttF[0]);
H
Hongze Cheng 已提交
64 65 66 67
      taosMemoryFree(pSet->pSmaF);
      taosArrayRemove(fs.aDFileSet, iSet);
      iSet--;
    } else {
68
      if (expLevel == 0) continue;
H
Hongze Cheng 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        code = terrno;
        goto _exit;
      }

      if (did.level == pSet->diskId.level) continue;

      // copy file to new disk (todo)
      SDFileSet fSet = *pSet;
      fSet.diskId = did;

      code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
      if (code) goto _err;

      code = tsdbFSUpsertFSet(&fs, &fSet);
      if (code) goto _err;
    }
  }

  // do change fs
  code = tsdbFSCommit1(pTsdb, &fs);
H
Hongze Cheng 已提交
90 91
  if (code) goto _err;

H
Hongze Cheng 已提交
92 93 94 95 96 97 98 99 100 101 102 103
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit2(pTsdb, &fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbFSDestroy(&fs);

H
Hongze Cheng 已提交
104 105 106 107
_exit:
  return code;

_err:
S
Shengliang Guan 已提交
108
  tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
109 110
  ASSERT(0);
  // tsdbFSRollback(pTsdb->pFS);
H
Hongze Cheng 已提交
111 112
  return code;
}