tsdbRetention.c 7.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17
#include "tsdbFS2.h"
H
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19 20 21 22 23 24 25
typedef struct {
  STsdb  *tsdb;
  int32_t szPage;
  int64_t now;
  int64_t cid;

  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
26
  TFileOpArray   fopArr[1];
H
Hongze Cheng 已提交
27 28 29 30 31

  struct {
    int32_t    fsetArrIdx;
    STFileSet *fset;
  } ctx[1];
H
Hongze Cheng 已提交
32
} SRTNer;
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
H
Hongze Cheng 已提交
35 36 37 38 39 40
  STFileOp op = {
      .optype = TSDB_FOP_REMOVE,
      .fid = fobj->f->fid,
      .of = fobj->f[0],
  };

H
Hongze Cheng 已提交
41
  return TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
42 43
}

H
Hongze Cheng 已提交
44
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
H
Hongze Cheng 已提交
45 46 47 48 49 50 51
  int32_t code = 0;
  int32_t lino = 0;

  char      fname[TSDB_FILENAME_LEN];
  TdFilePtr fdFrom = NULL;
  TdFilePtr fdTo = NULL;

H
Hongze Cheng 已提交
52
  tsdbTFileName(rtner->tsdb, to, fname);
H
Hongze Cheng 已提交
53 54 55 56 57 58 59 60 61

  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
  if (fdFrom == NULL) code = terrno;
  TSDB_CHECK_CODE(code, lino, _exit);

  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
  if (fdTo == NULL) code = terrno;
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
62
  int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage));
H
Hongze Cheng 已提交
63 64 65 66 67 68 69 70 71
  if (n < 0) {
    code = TAOS_SYSTEM_ERROR(errno);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosCloseFile(&fdFrom);
  taosCloseFile(&fdTo);

_exit:
  if (code) {
H
Hongze Cheng 已提交
72
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
73 74 75 76 77 78
    taosCloseFile(&fdFrom);
    taosCloseFile(&fdTo);
  }
  return code;
}

H
Hongze Cheng 已提交
79
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88 89 90
  int32_t  code = 0;
  int32_t  lino = 0;
  STFileOp op = {0};

  // remove old
  op = (STFileOp){
      .optype = TSDB_FOP_REMOVE,
      .fid = fobj->f->fid,
      .of = fobj->f[0],
  };

H
Hongze Cheng 已提交
91
  code = TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
92 93 94 95 96 97 98 99 100 101 102
  TSDB_CHECK_CODE(code, lino, _exit);

  // create new
  op = (STFileOp){
      .optype = TSDB_FOP_CREATE,
      .fid = fobj->f->fid,
      .nf =
          {
              .type = fobj->f->type,
              .did = did[0],
              .fid = fobj->f->fid,
H
Hongze Cheng 已提交
103
              .cid = rtner->cid,
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111
              .size = fobj->f->size,
              .stt[0] =
                  {
                      .level = fobj->f->stt[0].level,
                  },
          },
  };

H
Hongze Cheng 已提交
112
  code = TARRAY2_APPEND(rtner->fopArr, op);
H
Hongze Cheng 已提交
113 114 115
  TSDB_CHECK_CODE(code, lino, _exit);

  // do copy the file
H
Hongze Cheng 已提交
116
  code = tsdbDoCopyFile(rtner, fobj, &op.nf);
H
Hongze Cheng 已提交
117 118 119 120
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
121
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
122 123 124 125
  }
  return code;
}

H
Hongze Cheng 已提交
126 127
typedef struct {
  STsdb  *tsdb;
H
Hongze Cheng 已提交
128
  int32_t sync;
H
Hongze Cheng 已提交
129 130 131 132
  int64_t now;
} SRtnArg;

static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
H
Hongze Cheng 已提交
133 134 135
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
136
  STsdb *tsdb = arg->tsdb;
H
Hongze Cheng 已提交
137

H
Hongze Cheng 已提交
138 139 140 141
  rtner->tsdb = tsdb;
  rtner->szPage = tsdb->pVnode->config.tsdbPageSize;
  rtner->now = arg->now;
  rtner->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
142

H
Hongze Cheng 已提交
143
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr);
H
Hongze Cheng 已提交
144 145 146 147
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
148
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
149
  } else {
H
Hongze Cheng 已提交
150
    tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
H
Hongze Cheng 已提交
151 152 153 154
  }
  return code;
}

H
Hongze Cheng 已提交
155
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
H
Hongze Cheng 已提交
156 157 158
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
159
  if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit;
H
Hongze Cheng 已提交
160

H
Hongze Cheng 已提交
161
  code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
162 163
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
164
  taosThreadRwlockWrlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
165

H
Hongze Cheng 已提交
166
  code = tsdbFSEditCommit(rtner->tsdb->pFS);
H
Hongze Cheng 已提交
167
  if (code) {
H
Hongze Cheng 已提交
168
    taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
169 170 171
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
172
  taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174
  TARRAY2_DESTROY(rtner->fopArr, NULL);
H
Hongze Cheng 已提交
175 176 177

_exit:
  if (code) {
H
Hongze Cheng 已提交
178
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
179
  } else {
H
Hongze Cheng 已提交
180
    tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
H
Hongze Cheng 已提交
181
  }
H
Hongze Cheng 已提交
182
  tsdbFSDestroyCopySnapshot(&rtner->fsetArr);
H
Hongze Cheng 已提交
183 184 185
  return code;
}

H
Hongze Cheng 已提交
186
static int32_t tsdbDoRetention2(void *arg) {
H
Hongze Cheng 已提交
187 188
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
189
  SRTNer  rtner[1] = {0};
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191
  code = tsdbDoRetentionBegin(arg, rtner);
H
Hongze Cheng 已提交
192 193
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
194
  for (rtner->ctx->fsetArrIdx = 0; rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr); rtner->ctx->fsetArrIdx++) {
H
Hongze Cheng 已提交
195
    rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx);
H
Hongze Cheng 已提交
196 197

    STFileObj *fobj;
H
Hongze Cheng 已提交
198
    int32_t    expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now);
H
Hongze Cheng 已提交
199 200

    if (expLevel < 0) {  // remove the file set
H
Hongze Cheng 已提交
201
      for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
H
Hongze Cheng 已提交
202 203
        if (fobj == NULL) continue;

H
Hongze Cheng 已提交
204
        code = tsdbDoRemoveFileObject(rtner, fobj);
H
Hongze Cheng 已提交
205 206 207 208
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      SSttLvl *lvl;
H
Hongze Cheng 已提交
209
      TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
210
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
H
Hongze Cheng 已提交
211
          code = tsdbDoRemoveFileObject(rtner, fobj);
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    } else if (expLevel == 0) {
      continue;
    } else {
      SDiskID did;

H
Hongze Cheng 已提交
220
      if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
H
Hongze Cheng 已提交
221 222 223
        code = terrno;
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
224
      tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
H
Hongze Cheng 已提交
225 226

      // data
H
Hongze Cheng 已提交
227
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
H
Hongze Cheng 已提交
228 229 230
        if (fobj == NULL) continue;

        if (fobj->f->did.level == did.level) continue;
H
Hongze Cheng 已提交
231
        code = tsdbDoMigrateFileObj(rtner, fobj, &did);
H
Hongze Cheng 已提交
232 233 234 235 236
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      // stt
      SSttLvl *lvl;
H
Hongze Cheng 已提交
237
      TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
238 239 240
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
          if (fobj->f->did.level == did.level) continue;

H
Hongze Cheng 已提交
241
          code = tsdbDoMigrateFileObj(rtner, fobj, &did);
H
Hongze Cheng 已提交
242 243 244 245 246 247
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
    }
  }

H
Hongze Cheng 已提交
248
  code = tsdbDoRetentionEnd(rtner);
H
Hongze Cheng 已提交
249 250 251 252
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
253
    TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
254 255 256
  }
  return code;
}
H
Hongze Cheng 已提交
257

H
Hongze Cheng 已提交
258 259 260 261 262 263 264 265 266
static void tsdbFreeRtnArg(void *arg) {
  SRtnArg *rArg = (SRtnArg *)arg;
  if (rArg->sync) {
    tsem_post(&rArg->tsdb->pVnode->canCommit);
  }
  taosMemoryFree(arg);
}

int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
H
Hongze Cheng 已提交
267 268 269
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
  if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
  arg->tsdb = tsdb;
H
Hongze Cheng 已提交
270
  arg->sync = sync;
H
Hongze Cheng 已提交
271 272
  arg->now = now;

H
Hongze Cheng 已提交
273 274 275
  if (sync) {
    tsem_wait(&tsdb->pVnode->canCommit);
  }
H
Hongze Cheng 已提交
276 277

  int64_t taskid;
H
Hongze Cheng 已提交
278 279 280 281 282 283
  int32_t code =
      tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid);
  if (code) {
    tsdbFreeRtnArg(arg);
  }
  return code;
H
Hongze Cheng 已提交
284
}