tsdbMerge.c 17.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21 22 23 24 25 26 27 28 29 30

  int32_t  sttTrigger;
  int32_t  maxRow;
  int32_t  minRow;
  int32_t  szPage;
  int8_t   cmprAlg;
  int64_t  compactVersion;
  int64_t  cid;
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
31

H
Hongze Cheng 已提交
32
  // context
H
Hongze Cheng 已提交
33
  struct {
H
Hongze Cheng 已提交
34
    bool       opened;
H
Hongze Cheng 已提交
35
    int64_t    now;
H
Hongze Cheng 已提交
36
    STFileSet *fset;
H
Hongze Cheng 已提交
37 38
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
39 40
    SSttLvl   *lvl;
    STFileObj *fobj;
H
Hongze Cheng 已提交
41 42 43
    TABLEID    tbid[1];
    int32_t    bDataIdx;
    SBlockData bData[2];
H
Hongze Cheng 已提交
44
  } ctx[1];
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46 47
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
48
  // reader
H
Hongze Cheng 已提交
49
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
50 51 52
  // iter
  TTsdbIterArray iterArr[1];
  SIterMerger   *iterMerger;
H
Hongze Cheng 已提交
53
  // writer
H
Hongze Cheng 已提交
54 55
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
56 57
} SMerger;

H
Hongze Cheng 已提交
58
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
59
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
60 61 62 63 64
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
65
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
66
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
67
  return 0;
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
71 72 73
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
74 75

  // edit file system
H
Hongze Cheng 已提交
76
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
77 78
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
79
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
80 81
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
82 83 84 85 86 87
  ASSERT(merger->dataWriter == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
88
  // clear the merge
H
Hongze Cheng 已提交
89 90
  TARRAY2_FREE(merger->iterArr);
  TARRAY2_FREE(merger->sttReaderArr);
H
Hongze Cheng 已提交
91
  TARRAY2_FREE(merger->fopArr);
H
Hongze Cheng 已提交
92 93 94
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataDestroy(merger->ctx->bData + i);
  }
H
Hongze Cheng 已提交
95
  tDestroyTSchema(merger->skmTb->pTSchema);
H
Hongze Cheng 已提交
96
  tDestroyTSchema(merger->skmRow->pTSchema);
H
Hongze Cheng 已提交
97 98 99

_exit:
  if (code) {
H
Hongze Cheng 已提交
100
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
101
  }
H
Hongze Cheng 已提交
102
  return code;
H
Hongze Cheng 已提交
103
}
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
H
Hongze Cheng 已提交
106
  if (merger->ctx->bData[0].nRow + merger->ctx->bData[1].nRow == 0) return 0;
H
Hongze Cheng 已提交
107 108 109

  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
110 111
  int32_t cidx = merger->ctx->bDataIdx;
  int32_t pidx = (cidx + 1) % 2;
H
Hongze Cheng 已提交
112
  int32_t numRow = (merger->ctx->bData[pidx].nRow + merger->ctx->bData[cidx].nRow) / 2;
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114
  if (merger->ctx->bData[pidx].nRow > 0 && numRow >= merger->minRow) {
H
Hongze Cheng 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128
    ASSERT(merger->ctx->bData[pidx].nRow == merger->maxRow);

    SRowInfo row[1] = {{
        .suid = merger->ctx->tbid->suid,
        .uid = merger->ctx->tbid->uid,
        .row = tsdbRowFromBlockData(merger->ctx->bData + pidx, 0),
    }};

    for (int32_t i = 0; i < numRow; i++) {
      row->row.iRow = i;

      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130
    code = tsdbDataFileFlushTSDataBlock(merger->dataWriter);
H
Hongze Cheng 已提交
131
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144

    for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0);
    for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
145
  } else {
H
Hongze Cheng 已提交
146 147 148 149
    if (merger->ctx->bData[pidx].nRow > 0) {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157 158 159 160
    if (merger->ctx->bData[cidx].nRow < merger->minRow) {
      code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataReset(merger->ctx->bData + i);
H
Hongze Cheng 已提交
161 162 163 164
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
165
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
166 167 168
  }
  return code;
}
H
Hongze Cheng 已提交
169 170

static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
H
Hongze Cheng 已提交
171 172 173
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
174
  code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
H
Hongze Cheng 已提交
175
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177
  merger->ctx->bDataIdx = 0;
H
Hongze Cheng 已提交
178 179 180 181
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    code = tBlockDataInit(merger->ctx->bData, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
182

H
Hongze Cheng 已提交
183 184
_exit:
  if (code) {
H
Hongze Cheng 已提交
185
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
186 187 188 189
  }
  return code;
}

H
Hongze Cheng 已提交
190
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
H
Hongze Cheng 已提交
191 192
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194 195
  for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) {
    if (row->uid != merger->ctx->tbid->uid) {
H
Hongze Cheng 已提交
196
      code = tsdbMergeToDataTableEnd(merger);
H
Hongze Cheng 已提交
197 198
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
199 200 201
      merger->ctx->tbid->suid = row->suid;
      merger->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
202
      code = tsdbMergeToDataTableBegin(merger);
H
Hongze Cheng 已提交
203 204
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
    TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208 209 210 211 212 213
    if (key->version <= merger->compactVersion                 //
        && merger->ctx->bData[merger->ctx->bDataIdx].nRow > 0  //
        && merger->ctx->bData[merger->ctx->bDataIdx].aTSKEY[merger->ctx->bData[merger->ctx->bDataIdx].nRow - 1] ==
               key->ts) {
      // update
      code = tBlockDataUpdateRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL);
H
Hongze Cheng 已提交
214
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
215 216 217 218 219 220 221 222 223 224 225 226
    } else {
      if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) {
        int32_t idx = (merger->ctx->bDataIdx + 1) % 2;

        code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + idx);
        TSDB_CHECK_CODE(code, lino, _exit);

        tBlockDataClear(merger->ctx->bData + idx);

        // switch to next bData
        merger->ctx->bDataIdx = idx;
      }
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229
      code = tBlockDataAppendRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL, row->uid);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
230
    }
H
Hongze Cheng 已提交
231 232 233

    code = tsdbIterMergerNext(merger->iterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
234 235
  }

H
Hongze Cheng 已提交
236 237 238
  code = tsdbMergeToDataTableEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
239 240
_exit:
  if (code) {
H
Hongze Cheng 已提交
241
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
242 243 244 245 246 247 248 249 250
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
251 252 253
  SRowInfo *row;
  while ((row = tsdbIterMergerGet(merger->iterMerger))) {
    code = tsdbSttFileWriteTSData(merger->sttWriter, row);
H
Hongze Cheng 已提交
254 255
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
256
    code = tsdbIterMergerNext(merger->iterMerger);
H
Hongze Cheng 已提交
257 258 259 260 261 262 263 264 265 266
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
267 268 269
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
270

H
Hongze Cheng 已提交
271 272
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281 282

  // TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {

  for (int32_t i = 0;; ++i) {
    if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
      merger->ctx->lvl = NULL;
      break;
    }

    merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
H
Hongze Cheng 已提交
283 284 285
    if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
286
      break;
H
Hongze Cheng 已提交
287
    }
H
Hongze Cheng 已提交
288

H
Hongze Cheng 已提交
289 290 291 292
    ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);

    merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
    if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
H
Hongze Cheng 已提交
293
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
294
      break;
H
Hongze Cheng 已提交
295
    } else {
H
Hongze Cheng 已提交
296
      merger->ctx->level++;
H
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298
      // add remove operation
H
Hongze Cheng 已提交
299 300 301 302 303 304 305 306
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
          .of = merger->ctx->fobj->f[0],
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
307
      // open the reader
H
Hongze Cheng 已提交
308
      SSttFileReader      *reader;
H
Hongze Cheng 已提交
309
      SSttFileReaderConfig config[1] = {{
H
Hongze Cheng 已提交
310
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
311 312 313
          .szPage = merger->szPage,
          .file[0] = merger->ctx->fobj->f[0],
      }};
H
Hongze Cheng 已提交
314
      code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader);
H
Hongze Cheng 已提交
315
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
318
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
319 320
    }
  }
H
Hongze Cheng 已提交
321

H
Hongze Cheng 已提交
322 323
_exit:
  if (code) {
H
Hongze Cheng 已提交
324
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
    const TSttSegReaderArray *segReaderArr;

H
Hongze Cheng 已提交
338
    code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr);
H
Hongze Cheng 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
    TSDB_CHECK_CODE(code, lino, _exit);

    SSttSegReader *segReader;
    TARRAY2_FOREACH(segReaderArr, segReader) {
      STsdbIter *iter;

      STsdbIterConfig config[1] = {{
          .type = TSDB_ITER_TYPE_STT,
          .sttReader = segReader,
      }};

      code = tsdbIterOpen(config, &iter);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = TARRAY2_APPEND(merger->iterArr, iter);
H
Hongze Cheng 已提交
354
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
355
    }
H
Hongze Cheng 已提交
356
  }
H
Hongze Cheng 已提交
357

H
Hongze Cheng 已提交
358
  code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger);
H
Hongze Cheng 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371 372
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
373 374
  if (merger->ctx->lvl) {
    // to existing level
H
Hongze Cheng 已提交
375
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
376
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
377 378 379
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
380 381 382 383
        .compactVersion = merger->compactVersion,
        .file = merger->ctx->fobj->f[0],
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
384
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
385 386 387 388 389 390 391 392 393
  } else {
    SDiskID did[1];
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) {
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    // to new level
H
Hongze Cheng 已提交
394
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
395
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
396 397 398
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
399
        .compactVersion = merger->compactVersion,
H
Hongze Cheng 已提交
400
        .file =
H
Hongze Cheng 已提交
401
            {
H
Hongze Cheng 已提交
402
                .type = TSDB_FTYPE_STT,
H
Hongze Cheng 已提交
403
                .did = did[0],
H
Hongze Cheng 已提交
404
                .fid = merger->ctx->fset->fid,
H
Hongze Cheng 已提交
405 406
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
407 408 409 410
                .stt = {{
                    .level = merger->ctx->level,
                    .nseg = 0,
                }},
H
Hongze Cheng 已提交
411
            },
H
Hongze Cheng 已提交
412 413
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
414
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
415 416
  }

H
Hongze Cheng 已提交
417
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
418
    SDiskID did;
H
Hongze Cheng 已提交
419
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
H
Hongze Cheng 已提交
420 421

    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
H
Hongze Cheng 已提交
422 423 424
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
425

H
Hongze Cheng 已提交
426
    SDataFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
427
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
428
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
429
        .maxRow = merger->maxRow,
H
Hongze Cheng 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
        .szPage = merger->szPage,
        .fid = merger->ctx->fset->fid,
        .cid = merger->cid,
        .did = did,
        .compactVersion = merger->compactVersion,
    }};

    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
      if (merger->ctx->fset->farr[i]) {
        config->files[i].exist = true;
        config->files[i].file = merger->ctx->fset->farr[i]->f[0];
      } else {
        config->files[i].exist = false;
      }
    }

    code = tsdbDataFileWriterOpen(config, &merger->dataWriter);
H
Hongze Cheng 已提交
447
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
448
  }
H
Hongze Cheng 已提交
449

H
Hongze Cheng 已提交
450 451
_exit:
  if (code) {
H
Hongze Cheng 已提交
452
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
453 454 455
  }
  return code;
}
H
Hongze Cheng 已提交
456 457

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
458 459 460
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
461 462 463 464 465 466
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->dataWriter == NULL);

H
Hongze Cheng 已提交
467 468 469 470 471 472 473
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  merger->ctx->bDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); ++i) {
    tBlockDataReset(merger->ctx->bData + i);
  }

H
Hongze Cheng 已提交
474 475
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
476 477
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
478 479 480 481 482 483 484 485 486 487
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
488
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
489
  }
H
Hongze Cheng 已提交
490 491 492 493 494 495 496 497
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
498
  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
499 500
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
501
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
502
    code = tsdbDataFileWriterClose(&merger->dataWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
503 504 505
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
506 507
_exit:
  if (code) {
H
Hongze Cheng 已提交
508
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
509 510
  }
  return code;
H
Hongze Cheng 已提交
511
}
H
Hongze Cheng 已提交
512 513

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
514
  tsdbIterMergerClose(&merger->iterMerger);
H
Hongze Cheng 已提交
515 516 517 518 519
  TARRAY2_CLEAR(merger->iterArr, tsdbIterClose);
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
520
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
521 522 523 524
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
525 526
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
527
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
528

H
Hongze Cheng 已提交
529 530 531 532 533 534 535 536 537 538 539 540
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
541
  }
H
Hongze Cheng 已提交
542 543
  return code;
}
H
Hongze Cheng 已提交
544

H
Hongze Cheng 已提交
545 546 547
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
548

H
Hongze Cheng 已提交
549
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
550 551 552
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
553
  // do merge
H
Hongze Cheng 已提交
554
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
555
    code = tsdbMergeToDataLevel(merger);
H
Hongze Cheng 已提交
556 557 558 559 560
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
561 562 563 564

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
565 566
_exit:
  if (code) {
H
Hongze Cheng 已提交
567
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
568
  } else {
H
Hongze Cheng 已提交
569
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
570
  }
H
Hongze Cheng 已提交
571
  return 0;
H
Hongze Cheng 已提交
572 573
}

H
Hongze Cheng 已提交
574
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
575
  int32_t code = 0;
H
Hongze Cheng 已提交
576
  int32_t lino = 0;
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
H
Hongze Cheng 已提交
580
    SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
H
Hongze Cheng 已提交
581
    if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583
    STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
H
Hongze Cheng 已提交
584
    if (fobj->f->stt->nseg < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586 587 588 589 590
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
591 592
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
593 594
  }

H
Hongze Cheng 已提交
595 596
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
597 598
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600 601
_exit:
  if (code) {
H
Hongze Cheng 已提交
602
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
603
  } else {
H
Hongze Cheng 已提交
604
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618
  }
  return code;
}

int32_t tsdbMerge(STsdb *tsdb) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
619
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
620 621 622 623 624
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
625
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627 628
_exit:
  if (code) {
H
Hongze Cheng 已提交
629 630
    TSDB_ERROR_LOG(vid, lino, code);
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
631
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
632
  }
H
Hongze Cheng 已提交
633
  return code;
H
Hongze Cheng 已提交
634
}