tsdbRetention.c 7.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17
#include "tsdbFS2.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
typedef struct {
  STsdb  *tsdb;
  int32_t szPage;
  int64_t now;
  int64_t cid;

  TFileSetArray *fsetArr;
  TFileOpArray  *fopArr;

  struct {
    int32_t    fsetArrIdx;
    STFileSet *fset;
  } ctx[1];
H
Hongze Cheng 已提交
32
} SRTNer;
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
H
Hongze Cheng 已提交
35 36 37 38 39 40
  STFileOp op = {
      .optype = TSDB_FOP_REMOVE,
      .fid = fobj->f->fid,
      .of = fobj->f[0],
  };

H
Hongze Cheng 已提交
41
  return TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
42 43
}

H
Hongze Cheng 已提交
44
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
H
Hongze Cheng 已提交
45 46 47 48 49 50 51
  int32_t code = 0;
  int32_t lino = 0;

  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr fdFrom = NULL;
  TdFilePtr fdTo = NULL;

H
Hongze Cheng 已提交
52
  tsdbTFileName(rtner->tsdb, to, fname);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61

  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
  if (fdFrom == NULL) code = terrno;
  TSDB_CHECK_CODE(code, lino, _exit);

  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (fdTo == NULL) code = terrno;
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
62
  int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage));
H
Hongze Cheng 已提交
63 64 65 66 67 68 69 70 71
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosCloseFile(&fdFrom);
  taosCloseFile(&fdTo);

_exit:
  if (code) {
H
Hongze Cheng 已提交
72
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
73 74 75 76 77 78
    taosCloseFile(&fdFrom);
    taosCloseFile(&fdTo);
  }
  return code;
}

H
Hongze Cheng 已提交
79
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88 89 90
  int32_t  code = 0;
  int32_t  lino = 0;
  STFileOp op = {0};

  // remove old
  op = (STFileOp){
      .optype = TSDB_FOP_REMOVE,
      .fid = fobj->f->fid,
      .of = fobj->f[0],
  };

H
Hongze Cheng 已提交
91
  code = TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
92 93 94 95 96 97 98 99 100 101 102
  TSDB_CHECK_CODE(code, lino, _exit);

  // create new
  op = (STFileOp){
      .optype = TSDB_FOP_CREATE,
      .fid = fobj->f->fid,
      .nf =
          {
              .type = fobj->f->type,
              .did = did[0],
              .fid = fobj->f->fid,
H
Hongze Cheng 已提交
103
              .cid = rtner->cid,
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111
              .size = fobj->f->size,
              .stt[0] =
                  {
                      .level = fobj->f->stt[0].level,
                  },
          },
  };

H
Hongze Cheng 已提交
112
  code = TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
113 114 115
  TSDB_CHECK_CODE(code, lino, _exit);

  // do copy the file
H
Hongze Cheng 已提交
116
  code = tsdbDoCopyFile(rtner, fobj, &op.nf);
H
Hongze Cheng 已提交
117 118 119 120
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
121
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
122 123 124 125
  }
  return code;
}

H
Hongze Cheng 已提交
126 127 128 129 130 131
typedef struct {
  STsdb  *tsdb;
  int64_t now;
} SRtnArg;

static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
H
Hongze Cheng 已提交
132 133 134
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
135
  STsdb *tsdb = arg->tsdb;
H
Hongze Cheng 已提交
136

H
Hongze Cheng 已提交
137 138 139 140
  rtner->tsdb = tsdb;
  rtner->szPage = tsdb->pVnode->config.tsdbPageSize;
  rtner->now = arg->now;
  rtner->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr);
H
Hongze Cheng 已提交
143 144 145 146
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
147
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
148
  } else {
H
Hongze Cheng 已提交
149
    tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
H
Hongze Cheng 已提交
150 151 152 153
  }
  return code;
}

H
Hongze Cheng 已提交
154
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
H
Hongze Cheng 已提交
155 156 157
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
158
  if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit;
H
Hongze Cheng 已提交
159

H
Hongze Cheng 已提交
160
  code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
161 162
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
163
  taosThreadRwlockWrlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
164

H
Hongze Cheng 已提交
165
  code = tsdbFSEditCommit(rtner->tsdb->pFS);
H
Hongze Cheng 已提交
166
  if (code) {
H
Hongze Cheng 已提交
167
    taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
168 169 170
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
171
  taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173 174
  TARRAY2_DESTROY(rtner->fopArr, NULL);
  tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
H
Hongze Cheng 已提交
175 176 177

_exit:
  if (code) {
H
Hongze Cheng 已提交
178
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
179
  } else {
H
Hongze Cheng 已提交
180
    tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
H
Hongze Cheng 已提交
181 182 183 184
  }
  return code;
}

H
Hongze Cheng 已提交
185
static int32_t tsdbDoRetention2(void *arg) {
H
Hongze Cheng 已提交
186 187
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
188
  SRTNer  rtner[1] = {0};
H
Hongze Cheng 已提交
189

H
Hongze Cheng 已提交
190
  code = tsdbDoRetentionBegin(arg, rtner);
H
Hongze Cheng 已提交
191 192
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
193 194
  while (rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr)) {
    rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx);
H
Hongze Cheng 已提交
195 196

    STFileObj *fobj;
H
Hongze Cheng 已提交
197
    int32_t    expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now);
H
Hongze Cheng 已提交
198 199

    if (expLevel < 0) {  // remove the file set
H
Hongze Cheng 已提交
200
      for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
H
Hongze Cheng 已提交
201 202
        if (fobj == NULL) continue;

H
Hongze Cheng 已提交
203
        code = tsdbDoRemoveFileObject(rtner, fobj);
H
Hongze Cheng 已提交
204 205 206 207
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      SSttLvl *lvl;
H
Hongze Cheng 已提交
208
      TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
209
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
H
Hongze Cheng 已提交
210
          code = tsdbDoRemoveFileObject(rtner, fobj);
H
Hongze Cheng 已提交
211 212 213 214 215 216 217 218
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    } else if (expLevel == 0) {
      continue;
    } else {
      SDiskID did;

H
Hongze Cheng 已提交
219
      if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
H
Hongze Cheng 已提交
220 221 222 223 224
        code = terrno;
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      // data
H
Hongze Cheng 已提交
225
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
H
Hongze Cheng 已提交
226 227 228
        if (fobj == NULL) continue;

        if (fobj->f->did.level == did.level) continue;
H
Hongze Cheng 已提交
229
        code = tsdbDoMigrateFileObj(rtner, fobj, &did);
H
Hongze Cheng 已提交
230 231 232 233 234
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      // stt
      SSttLvl *lvl;
H
Hongze Cheng 已提交
235
      TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
236 237 238
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
          if (fobj->f->did.level == did.level) continue;

H
Hongze Cheng 已提交
239
          code = tsdbDoMigrateFileObj(rtner, fobj, &did);
H
Hongze Cheng 已提交
240 241 242 243 244 245
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    }
  }

H
Hongze Cheng 已提交
246
  code = tsdbDoRetentionEnd(rtner);
H
Hongze Cheng 已提交
247 248 249 250
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
251
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
252
  }
H
Hongze Cheng 已提交
253
  taosMemoryFree(arg);
H
Hongze Cheng 已提交
254 255
  return code;
}
H
Hongze Cheng 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277

int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) {
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
  if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;

  arg->tsdb = tsdb;
  arg->now = now;

  int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid);
  if (code) taosMemoryFree(arg);

  return code;
}

int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) {
  int64_t taskid;

  int32_t code = tsdbAsyncRetention(tsdb, now, &taskid);
  if (code) return code;

  return tsdbFSWaitBgTask(tsdb->pFS, taskid);
}