tsdbMerge.c 17.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21 22 23 24 25 26 27 28 29 30

  int32_t  sttTrigger;
  int32_t  maxRow;
  int32_t  minRow;
  int32_t  szPage;
  int8_t   cmprAlg;
  int64_t  compactVersion;
  int64_t  cid;
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
31

H
Hongze Cheng 已提交
32
  // context
H
Hongze Cheng 已提交
33
  struct {
H
Hongze Cheng 已提交
34
    bool       opened;
H
Hongze Cheng 已提交
35
    int64_t    now;
H
Hongze Cheng 已提交
36
    STFileSet *fset;
H
Hongze Cheng 已提交
37 38
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
39 40
    SSttLvl   *lvl;
    STFileObj *fobj;
H
Hongze Cheng 已提交
41 42 43
    TABLEID    tbid[1];
    int32_t    bDataIdx;
    SBlockData bData[2];
H
Hongze Cheng 已提交
44
  } ctx[1];
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46 47
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
48
  // reader
H
Hongze Cheng 已提交
49
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
50 51 52
  // iter
  TTsdbIterArray iterArr[1];
  SIterMerger   *iterMerger;
H
Hongze Cheng 已提交
53
  // writer
H
Hongze Cheng 已提交
54 55
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
56 57
} SMerger;

H
Hongze Cheng 已提交
58
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
59
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
60 61 62 63 64
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
65
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
66
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
67
  return 0;
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
71 72 73
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
74 75

  // edit file system
H
Hongze Cheng 已提交
76
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
77 78
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
79
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
80 81
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
82 83 84 85 86 87
  ASSERT(merger->dataWriter == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
88
  // clear the merge
H
Hongze Cheng 已提交
89 90
  TARRAY2_FREE(merger->iterArr);
  TARRAY2_FREE(merger->sttReaderArr);
H
Hongze Cheng 已提交
91
  TARRAY2_FREE(merger->fopArr);
H
Hongze Cheng 已提交
92 93 94
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataDestroy(merger->ctx->bData + i);
  }
H
Hongze Cheng 已提交
95
  tDestroyTSchema(merger->skmTb->pTSchema);
H
Hongze Cheng 已提交
96
  tDestroyTSchema(merger->skmRow->pTSchema);
H
Hongze Cheng 已提交
97 98 99

_exit:
  if (code) {
H
Hongze Cheng 已提交
100
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
101
  }
H
Hongze Cheng 已提交
102
  return code;
H
Hongze Cheng 已提交
103
}
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
H
Hongze Cheng 已提交
106
  if (merger->ctx->bData[0].nRow + merger->ctx->bData[1].nRow == 0) return 0;
H
Hongze Cheng 已提交
107 108 109

  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
110 111
  int32_t cidx = merger->ctx->bDataIdx;
  int32_t pidx = (cidx + 1) % 2;
H
Hongze Cheng 已提交
112
  int32_t numRow = (merger->ctx->bData[pidx].nRow + merger->ctx->bData[cidx].nRow) / 2;
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114
  if (merger->ctx->bData[pidx].nRow > 0 && numRow >= merger->minRow) {
H
Hongze Cheng 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128
    ASSERT(merger->ctx->bData[pidx].nRow == merger->maxRow);

    SRowInfo row[1] = {{
        .suid = merger->ctx->tbid->suid,
        .uid = merger->ctx->tbid->uid,
        .row = tsdbRowFromBlockData(merger->ctx->bData + pidx, 0),
    }};

    for (int32_t i = 0; i < numRow; i++) {
      row->row.iRow = i;

      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130
    code = tsdbDataFileFlushTSDataBlock(merger->dataWriter);
H
Hongze Cheng 已提交
131
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144

    for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0);
    for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
145
  } else {
H
Hongze Cheng 已提交
146 147 148 149
    if (merger->ctx->bData[pidx].nRow > 0) {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157 158 159 160
    if (merger->ctx->bData[cidx].nRow < merger->minRow) {
      code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataReset(merger->ctx->bData + i);
H
Hongze Cheng 已提交
161 162 163 164
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
165
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
166 167 168
  }
  return code;
}
H
Hongze Cheng 已提交
169 170

static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
H
Hongze Cheng 已提交
171 172 173
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
174
  code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
H
Hongze Cheng 已提交
175
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177
  merger->ctx->bDataIdx = 0;
H
Hongze Cheng 已提交
178 179 180 181
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    code = tBlockDataInit(merger->ctx->bData, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
182

H
Hongze Cheng 已提交
183 184
_exit:
  if (code) {
H
Hongze Cheng 已提交
185
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
186 187 188 189
  }
  return code;
}

H
Hongze Cheng 已提交
190
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
H
Hongze Cheng 已提交
191 192
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194 195
  for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) {
    if (row->uid != merger->ctx->tbid->uid) {
H
Hongze Cheng 已提交
196
      code = tsdbMergeToDataTableEnd(merger);
H
Hongze Cheng 已提交
197 198
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
199 200 201
      merger->ctx->tbid->suid = row->suid;
      merger->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
202
      code = tsdbMergeToDataTableBegin(merger);
H
Hongze Cheng 已提交
203 204
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
    TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209 210 211 212 213
    if (key->version <= merger->compactVersion                 //
        && merger->ctx->bData[merger->ctx->bDataIdx].nRow > 0  //
        && merger->ctx->bData[merger->ctx->bDataIdx].aTSKEY[merger->ctx->bData[merger->ctx->bDataIdx].nRow - 1] ==
               key->ts) {
      // update
      code = tBlockDataUpdateRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL);
H
Hongze Cheng 已提交
214
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
215 216 217 218 219 220 221 222 223 224 225 226
    } else {
      if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) {
        int32_t idx = (merger->ctx->bDataIdx + 1) % 2;

        code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + idx);
        TSDB_CHECK_CODE(code, lino, _exit);

        tBlockDataClear(merger->ctx->bData + idx);

        // switch to next bData
        merger->ctx->bDataIdx = idx;
      }
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229
      code = tBlockDataAppendRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL, row->uid);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
230
    }
H
Hongze Cheng 已提交
231 232 233

    code = tsdbIterMergerNext(merger->iterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
234 235
  }

H
Hongze Cheng 已提交
236 237 238
  code = tsdbMergeToDataTableEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
239 240
_exit:
  if (code) {
H
Hongze Cheng 已提交
241
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
242 243 244 245 246 247 248 249 250
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
251 252 253
  SRowInfo *row;
  while ((row = tsdbIterMergerGet(merger->iterMerger))) {
    code = tsdbSttFileWriteTSData(merger->sttWriter, row);
H
Hongze Cheng 已提交
254 255
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
256
    code = tsdbIterMergerNext(merger->iterMerger);
H
Hongze Cheng 已提交
257 258 259 260 261 262 263 264 265 266
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
267 268 269
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271 272
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
273 274 275 276
  TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {
    if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
277
      break;
H
Hongze Cheng 已提交
278
    }
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280 281 282 283
    ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);

    merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
    if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
H
Hongze Cheng 已提交
284
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
285
      break;
H
Hongze Cheng 已提交
286
    } else {
H
Hongze Cheng 已提交
287
      merger->ctx->level++;
H
Hongze Cheng 已提交
288

H
Hongze Cheng 已提交
289
      // add remove operation
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
          .of = merger->ctx->fobj->f[0],
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
298
      // open the reader
H
Hongze Cheng 已提交
299
      SSttFileReader      *reader;
H
Hongze Cheng 已提交
300
      SSttFileReaderConfig config[1] = {{
H
Hongze Cheng 已提交
301
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
302 303 304
          .szPage = merger->szPage,
          .file[0] = merger->ctx->fobj->f[0],
      }};
H
Hongze Cheng 已提交
305
      code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader);
H
Hongze Cheng 已提交
306
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
307

H
Hongze Cheng 已提交
308
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
309
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
310 311
    }
  }
H
Hongze Cheng 已提交
312

H
Hongze Cheng 已提交
313 314
_exit:
  if (code) {
H
Hongze Cheng 已提交
315
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
    const TSttSegReaderArray *segReaderArr;

H
Hongze Cheng 已提交
329
    code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr);
H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
    TSDB_CHECK_CODE(code, lino, _exit);

    SSttSegReader *segReader;
    TARRAY2_FOREACH(segReaderArr, segReader) {
      STsdbIter *iter;

      STsdbIterConfig config[1] = {{
          .type = TSDB_ITER_TYPE_STT,
          .sttReader = segReader,
      }};

      code = tsdbIterOpen(config, &iter);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = TARRAY2_APPEND(merger->iterArr, iter);
H
Hongze Cheng 已提交
345
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
346
    }
H
Hongze Cheng 已提交
347
  }
H
Hongze Cheng 已提交
348

H
Hongze Cheng 已提交
349
  code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger);
H
Hongze Cheng 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
364 365
  if (merger->ctx->lvl) {
    // to existing level
H
Hongze Cheng 已提交
366
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
367
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
368 369 370
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
371 372 373 374
        .compactVersion = merger->compactVersion,
        .file = merger->ctx->fobj->f[0],
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
375
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
376 377 378 379 380 381 382 383 384
  } else {
    SDiskID did[1];
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) {
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    // to new level
H
Hongze Cheng 已提交
385
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
386
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
387 388 389
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
390
        .compactVersion = merger->compactVersion,
H
Hongze Cheng 已提交
391
        .file =
H
Hongze Cheng 已提交
392
            {
H
Hongze Cheng 已提交
393
                .type = TSDB_FTYPE_STT,
H
Hongze Cheng 已提交
394
                .did = did[0],
H
Hongze Cheng 已提交
395
                .fid = merger->ctx->fset->fid,
H
Hongze Cheng 已提交
396 397
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
398 399 400 401
                .stt = {{
                    .level = merger->ctx->level,
                    .nseg = 0,
                }},
H
Hongze Cheng 已提交
402
            },
H
Hongze Cheng 已提交
403 404
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
405
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
406 407
  }

H
Hongze Cheng 已提交
408
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
409
    SDiskID did;
H
Hongze Cheng 已提交
410
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
H
Hongze Cheng 已提交
411 412

    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
H
Hongze Cheng 已提交
413 414 415
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417
    SDataFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
418
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
419
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
420
        .maxRow = merger->maxRow,
H
Hongze Cheng 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
        .szPage = merger->szPage,
        .fid = merger->ctx->fset->fid,
        .cid = merger->cid,
        .did = did,
        .compactVersion = merger->compactVersion,
    }};

    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
      if (merger->ctx->fset->farr[i]) {
        config->files[i].exist = true;
        config->files[i].file = merger->ctx->fset->farr[i]->f[0];
      } else {
        config->files[i].exist = false;
      }
    }

    code = tsdbDataFileWriterOpen(config, &merger->dataWriter);
H
Hongze Cheng 已提交
438
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
439
  }
H
Hongze Cheng 已提交
440

H
Hongze Cheng 已提交
441 442
_exit:
  if (code) {
H
Hongze Cheng 已提交
443
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
444 445 446
  }
  return code;
}
H
Hongze Cheng 已提交
447 448

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
449 450 451
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
452 453 454 455 456 457
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->dataWriter == NULL);

H
Hongze Cheng 已提交
458 459 460 461 462 463 464
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  merger->ctx->bDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); ++i) {
    tBlockDataReset(merger->ctx->bData + i);
  }

H
Hongze Cheng 已提交
465 466
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
467 468
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
469 470 471 472 473 474 475 476 477 478
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
479
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
480
  }
H
Hongze Cheng 已提交
481 482 483 484 485 486 487 488
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
489 490 491 492 493 494 495 496 497
  STFileOp op[TSDB_FTYPE_MAX];

  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, op);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (op->optype != TSDB_FOP_NONE) {
    code = TARRAY2_APPEND_PTR(merger->fopArr, op);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
498

H
Hongze Cheng 已提交
499
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
500
    // TODO
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508 509
    code = tsdbDataFileWriterClose(&merger->dataWriter, 0, op);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (op->optype != TSDB_FOP_NONE) {
      code = TARRAY2_APPEND_PTR(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

H
Hongze Cheng 已提交
510 511
_exit:
  if (code) {
H
Hongze Cheng 已提交
512
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
513 514
  }
  return code;
H
Hongze Cheng 已提交
515
}
H
Hongze Cheng 已提交
516 517

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
518
  tsdbIterMergerClose(&merger->iterMerger);
H
Hongze Cheng 已提交
519 520 521 522 523
  TARRAY2_CLEAR(merger->iterArr, tsdbIterClose);
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
524
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
525 526 527 528
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
529 530
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
531
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
532

H
Hongze Cheng 已提交
533 534 535 536 537 538 539 540 541 542 543 544
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
545
  }
H
Hongze Cheng 已提交
546 547
  return code;
}
H
Hongze Cheng 已提交
548

H
Hongze Cheng 已提交
549 550 551
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
552

H
Hongze Cheng 已提交
553
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
554 555 556
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
557
  // do merge
H
Hongze Cheng 已提交
558
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
559
    code = tsdbMergeToDataLevel(merger);
H
Hongze Cheng 已提交
560 561 562 563 564
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
565 566 567 568

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
569 570
_exit:
  if (code) {
H
Hongze Cheng 已提交
571
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
572
  } else {
H
Hongze Cheng 已提交
573
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
574
  }
H
Hongze Cheng 已提交
575
  return 0;
H
Hongze Cheng 已提交
576 577
}

H
Hongze Cheng 已提交
578
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
579
  int32_t code = 0;
H
Hongze Cheng 已提交
580
  int32_t lino = 0;
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582 583
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
H
Hongze Cheng 已提交
584
    SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
H
Hongze Cheng 已提交
585
    if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587
    STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
H
Hongze Cheng 已提交
588
    if (fobj->f->stt->nseg < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
589

H
Hongze Cheng 已提交
590 591 592 593 594
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
595 596
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
597 598
  }

H
Hongze Cheng 已提交
599 600
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
601 602
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
603

H
Hongze Cheng 已提交
604 605
_exit:
  if (code) {
H
Hongze Cheng 已提交
606
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
607
  } else {
H
Hongze Cheng 已提交
608
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621 622
  }
  return code;
}

int32_t tsdbMerge(STsdb *tsdb) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
623
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
624 625 626 627 628
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
629
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631 632
_exit:
  if (code) {
H
Hongze Cheng 已提交
633 634
    TSDB_ERROR_LOG(vid, lino, code);
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
635
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
636
  }
H
Hongze Cheng 已提交
637
  return code;
H
Hongze Cheng 已提交
638
}