tsdbRetention.c 2.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
  for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
H
Hongze Cheng 已提交
22
    SDiskID    did;
H
Hongze Cheng 已提交
23

H
Hongze Cheng 已提交
24
    if (expLevel == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
25 26

    if (expLevel < 0) {
H
Hongze Cheng 已提交
27
      return true;
H
Hongze Cheng 已提交
28 29
    } else {
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
H
Hongze Cheng 已提交
30
        return false;
H
Hongze Cheng 已提交
31 32
      }

H
Hongze Cheng 已提交
33
      if (did.level == pSet->diskId.level) continue;
H
Hongze Cheng 已提交
34

H
Hongze Cheng 已提交
35
      return true;
H
Hongze Cheng 已提交
36
    }
H
Hongze Cheng 已提交
37 38
  }

H
Hongze Cheng 已提交
39
  return false;
H
Hongze Cheng 已提交
40 41 42 43 44
}

int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
  int32_t code = 0;

H
Hongze Cheng 已提交
45 46 47
  if (!tsdbShouldDoRetention(pTsdb, now)) {
    return code;
  }
H
Hongze Cheng 已提交
48 49

  // do retention
H
Hongze Cheng 已提交
50 51 52
  STsdbFS fs;

  code = tsdbFSCopy(pTsdb, &fs);
H
Hongze Cheng 已提交
53 54
  if (code) goto _err;

H
Hongze Cheng 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
    SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
    int32_t    expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
    SDiskID    did;

    if (expLevel < 0) {
      taosMemoryFree(pSet->pHeadF);
      taosMemoryFree(pSet->pDataF);
      taosMemoryFree(pSet->pLastF);
      taosMemoryFree(pSet->pSmaF);
      taosArrayRemove(fs.aDFileSet, iSet);
      iSet--;
    } else {
      if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
        code = terrno;
        goto _exit;
      }

      if (did.level == pSet->diskId.level) continue;

      // copy file to new disk (todo)
      SDFileSet fSet = *pSet;
      fSet.diskId = did;

      code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
      if (code) goto _err;

      code = tsdbFSUpsertFSet(&fs, &fSet);
      if (code) goto _err;
    }

    /* code */
  }

  // do change fs
  code = tsdbFSCommit1(pTsdb, &fs);
H
Hongze Cheng 已提交
91 92
  if (code) goto _err;

H
Hongze Cheng 已提交
93 94 95 96 97 98 99 100 101 102 103 104
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit2(pTsdb, &fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbFSDestroy(&fs);

H
Hongze Cheng 已提交
105 106 107 108
_exit:
  return code;

_err:
S
Shengliang Guan 已提交
109
  tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
110 111
  ASSERT(0);
  // tsdbFSRollback(pTsdb->pFS);
H
Hongze Cheng 已提交
112 113
  return code;
}