tsdbMerge.c 11.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25 26 27 28
  int32_t sttTrigger;
  int32_t maxRow;
  int32_t minRow;
  int32_t szPage;
  int8_t  cmprAlg;
  int64_t compactVersion;
  int64_t cid;
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30
  // context
H
Hongze Cheng 已提交
31
  struct {
H
Hongze Cheng 已提交
32
    bool       opened;
H
Hongze Cheng 已提交
33
    int64_t    now;
H
Hongze Cheng 已提交
34
    STFileSet *fset;
H
Hongze Cheng 已提交
35 36
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
37
    SSttLvl   *lvl;
H
Hongze Cheng 已提交
38
    TABLEID    tbid[1];
H
Hongze Cheng 已提交
39
  } ctx[1];
H
Hongze Cheng 已提交
40

H
Hongze Cheng 已提交
41 42
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
43
  // reader
H
Hongze Cheng 已提交
44
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
45
  // iter
H
Hongze Cheng 已提交
46 47 48 49
  TTsdbIterArray dataIterArr[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArr[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
50
  // writer
H
Hongze Cheng 已提交
51
  SFSetWriter *writer;
H
Hongze Cheng 已提交
52 53
} SMerger;

H
Hongze Cheng 已提交
54
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
55
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
56 57 58 59 60
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
61
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
62
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
63
  return 0;
H
Hongze Cheng 已提交
64 65
}

H
Hongze Cheng 已提交
66
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
67 68 69
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
70 71

  // edit file system
H
Hongze Cheng 已提交
72
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
73 74
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
75
  taosThreadRwlockWrlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
76
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
77 78 79 80 81
  if (code) {
    taosThreadRwlockUnlock(&merger->tsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosThreadRwlockUnlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  ASSERT(merger->writer == NULL);
H
Hongze Cheng 已提交
84 85
  ASSERT(merger->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
H
Hongze Cheng 已提交
86 87
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
88
  // clear the merge
H
Hongze Cheng 已提交
89 90 91
  TARRAY2_DESTROY(merger->dataIterArr, NULL);
  TARRAY2_DESTROY(merger->sttReaderArr, NULL);
  TARRAY2_DESTROY(merger->fopArr, NULL);
H
Hongze Cheng 已提交
92 93 94

_exit:
  if (code) {
H
Hongze Cheng 已提交
95
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
96
  }
H
Hongze Cheng 已提交
97
  return code;
H
Hongze Cheng 已提交
98
}
H
Hongze Cheng 已提交
99

H
Hongze Cheng 已提交
100 101 102
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
103

H
Hongze Cheng 已提交
104 105
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113

  for (int32_t i = 0;; ++i) {
    if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
      merger->ctx->lvl = NULL;
      break;
    }

    merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
114 115
    if (merger->ctx->lvl->level != merger->ctx->level ||
        TARRAY2_SIZE(merger->ctx->lvl->fobjArr) + 1 < merger->sttTrigger) {
H
Hongze Cheng 已提交
116 117
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
118
      break;
H
Hongze Cheng 已提交
119
    }
H
Hongze Cheng 已提交
120

121
    merger->ctx->level++;
H
Hongze Cheng 已提交
122

123 124 125 126 127 128
    STFileObj *fobj;
    int32_t    numFile = 0;
    TARRAY2_FOREACH(merger->ctx->lvl->fobjArr, fobj) {
      if (numFile == merger->sttTrigger) {
        break;
      }
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130 131 132
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
133
          .of = fobj->f[0],
H
Hongze Cheng 已提交
134 135 136 137
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
138
      SSttFileReader      *reader;
139
      SSttFileReaderConfig config = {
H
Hongze Cheng 已提交
140
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
141
          .szPage = merger->szPage,
142 143 144 145
          .file[0] = fobj->f[0],
      };

      code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
H
Hongze Cheng 已提交
146
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
147

H
Hongze Cheng 已提交
148
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
149
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
150 151
    }
  }
H
Hongze Cheng 已提交
152

H
Hongze Cheng 已提交
153 154
_exit:
  if (code) {
H
Hongze Cheng 已提交
155
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
156 157 158 159 160 161 162 163 164 165 166
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
167 168
    STsdbIter      *iter;
    STsdbIterConfig config = {0};
H
Hongze Cheng 已提交
169

170 171 172 173 174
    // data iter
    config.type = TSDB_ITER_TYPE_STT;
    config.sttReader = sttReader;

    code = tsdbIterOpen(&config, &iter);
H
Hongze Cheng 已提交
175 176
    TSDB_CHECK_CODE(code, lino, _exit);

177 178
    code = TARRAY2_APPEND(merger->dataIterArr, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
179

180 181 182
    // tomb iter
    config.type = TSDB_ITER_TYPE_STT_TOMB;
    config.sttReader = sttReader;
H
Hongze Cheng 已提交
183

184 185
    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
186

187 188
    code = TARRAY2_APPEND(merger->tombIterArr, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
189
  }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191 192 193 194
  code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true);
H
Hongze Cheng 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

209 210 211 212
  SDiskID did;
  int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
  if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
    code = TSDB_CODE_FS_NO_VALID_DISK;
H
Hongze Cheng 已提交
213
    TSDB_CHECK_CODE(code, lino, _exit);
214
  }
H
Hongze Cheng 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227
  SFSetWriterConfig config = {
      .tsdb = merger->tsdb,
      .toSttOnly = true,
      .compactVersion = merger->compactVersion,
      .minRow = merger->minRow,
      .maxRow = merger->maxRow,
      .szPage = merger->szPage,
      .cmprAlg = merger->cmprAlg,
      .fid = merger->ctx->fset->fid,
      .cid = merger->cid,
      .did = did,
      .level = merger->ctx->level,
  };
H
Hongze Cheng 已提交
228

H
Hongze Cheng 已提交
229
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
230
    config.toSttOnly = false;
H
Hongze Cheng 已提交
231

H
Hongze Cheng 已提交
232 233 234 235
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
      if (merger->ctx->fset->farr[ftype]) {
        config.files[ftype].exist = true;
        config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
H
Hongze Cheng 已提交
236
      } else {
H
Hongze Cheng 已提交
237
        config.files[ftype].exist = false;
H
Hongze Cheng 已提交
238 239
      }
    }
H
Hongze Cheng 已提交
240
  }
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242 243 244
  code = tsdbFSetWriterOpen(&config, &merger->writer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
245 246
_exit:
  if (code) {
H
Hongze Cheng 已提交
247
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
248 249 250
  }
  return code;
}
H
Hongze Cheng 已提交
251 252

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
253 254 255
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
256
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
H
Hongze Cheng 已提交
257 258
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
  ASSERT(merger->dataIterMerger == NULL);
H
Hongze Cheng 已提交
259
  ASSERT(merger->writer == NULL);
H
Hongze Cheng 已提交
260

H
Hongze Cheng 已提交
261 262 263
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;

H
Hongze Cheng 已提交
264 265
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
266 267
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
268 269 270 271 272 273 274 275 276 277
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
278
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
279
  }
H
Hongze Cheng 已提交
280 281 282 283
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
H
Hongze Cheng 已提交
284
  return tsdbFSetWriterClose(&merger->writer, 0, merger->fopArr);
H
Hongze Cheng 已提交
285
}
H
Hongze Cheng 已提交
286 287

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
288 289 290 291
  tsdbIterMergerClose(&merger->tombIterMerger);
  TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
  tsdbIterMergerClose(&merger->dataIterMerger);
  TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
H
Hongze Cheng 已提交
292 293 294 295
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
296
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
297 298 299 300
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
301 302 303
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
304 305 306 307 308 309 310 311 312 313 314
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
315
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
316
  }
H
Hongze Cheng 已提交
317 318
  return code;
}
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321 322
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
325 326 327
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
  // data
  SMetaInfo info;
  SRowInfo *row;
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL) {
    if (row->uid != merger->ctx->tbid->uid) {
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(merger->dataIterMerger, (TABLEID *)row);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }

      merger->ctx->tbid->uid = row->uid;
      merger->ctx->tbid->suid = row->suid;
    }

    code = tsdbFSetWriteRow(merger->writer, row);
H
Hongze Cheng 已提交
346
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370

    code = tsdbIterMergerNext(merger->dataIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // tomb
  STombRecord *record;
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger)) != NULL) {
    if (record->uid != merger->ctx->tbid->uid) {
      merger->ctx->tbid->uid = record->uid;
      merger->ctx->tbid->suid = record->suid;

      if (metaGetInfo(merger->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(merger->tombIterMerger, merger->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }
    code = tsdbFSetWriteTombRecord(merger->writer, record);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbIterMergerNext(merger->tombIterMerger);
H
Hongze Cheng 已提交
371 372
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
373 374 375 376

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
377 378
_exit:
  if (code) {
H
Hongze Cheng 已提交
379
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
380
  } else {
H
Hongze Cheng 已提交
381
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
382
  }
H
Hongze Cheng 已提交
383
  return 0;
H
Hongze Cheng 已提交
384 385
}

H
Hongze Cheng 已提交
386
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
387
  int32_t code = 0;
H
Hongze Cheng 已提交
388
  int32_t lino = 0;
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390 391
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
392 393 394
    if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;

    SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
H
Hongze Cheng 已提交
395

396
    if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
397

H
Hongze Cheng 已提交
398 399 400 401 402
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
403 404
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
405 406
  }

H
Hongze Cheng 已提交
407 408
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
409 410
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412 413
_exit:
  if (code) {
H
Hongze Cheng 已提交
414
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
415
  } else {
H
Hongze Cheng 已提交
416
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
417 418 419 420
  }
  return code;
}

H
Hongze Cheng 已提交
421
int32_t tsdbMerge(void *arg) {
H
Hongze Cheng 已提交
422 423
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
424
  STsdb  *tsdb = (STsdb *)arg;
H
Hongze Cheng 已提交
425 426 427 428 429 430

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
431 432
  ASSERT(merger->sttTrigger > 1);

H
Hongze Cheng 已提交
433
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
434 435 436 437 438
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
439
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
440

H
Hongze Cheng 已提交
441 442
_exit:
  if (code) {
H
Hongze Cheng 已提交
443
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
444
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
445
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
446
  }
H
Hongze Cheng 已提交
447
  return code;
H
Hongze Cheng 已提交
448
}