tsdbMerge.c 17.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21 22 23 24 25 26 27 28 29 30

  int32_t  sttTrigger;
  int32_t  maxRow;
  int32_t  minRow;
  int32_t  szPage;
  int8_t   cmprAlg;
  int64_t  compactVersion;
  int64_t  cid;
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
31

H
Hongze Cheng 已提交
32
  // context
H
Hongze Cheng 已提交
33
  struct {
H
Hongze Cheng 已提交
34
    bool       opened;
H
Hongze Cheng 已提交
35
    int64_t    now;
H
Hongze Cheng 已提交
36
    STFileSet *fset;
H
Hongze Cheng 已提交
37 38
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
39 40
    SSttLvl   *lvl;
    STFileObj *fobj;
H
Hongze Cheng 已提交
41 42 43
    TABLEID    tbid[1];
    int32_t    bDataIdx;
    SBlockData bData[2];
H
Hongze Cheng 已提交
44
  } ctx[1];
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46 47
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
48
  // reader
H
Hongze Cheng 已提交
49
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
50 51 52
  // iter
  TTsdbIterArray iterArr[1];
  SIterMerger   *iterMerger;
H
Hongze Cheng 已提交
53
  // writer
H
Hongze Cheng 已提交
54 55
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
56 57
} SMerger;

H
Hongze Cheng 已提交
58
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
59
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
60 61 62 63 64
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
65
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
66
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
67
  return 0;
H
Hongze Cheng 已提交
68 69
}

H
Hongze Cheng 已提交
70
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
71 72 73
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
74 75

  // edit file system
H
Hongze Cheng 已提交
76
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
77 78
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
79
  taosThreadRwlockWrlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
80
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
81 82 83 84 85
  if (code) {
    taosThreadRwlockUnlock(&merger->tsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosThreadRwlockUnlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
86

H
Hongze Cheng 已提交
87 88 89 90 91 92
  ASSERT(merger->dataWriter == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
93
  // clear the merge
H
Hongze Cheng 已提交
94 95
  TARRAY2_FREE(merger->iterArr);
  TARRAY2_FREE(merger->sttReaderArr);
H
Hongze Cheng 已提交
96
  TARRAY2_FREE(merger->fopArr);
H
Hongze Cheng 已提交
97 98 99
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataDestroy(merger->ctx->bData + i);
  }
H
Hongze Cheng 已提交
100
  tDestroyTSchema(merger->skmTb->pTSchema);
H
Hongze Cheng 已提交
101
  tDestroyTSchema(merger->skmRow->pTSchema);
H
Hongze Cheng 已提交
102 103 104

_exit:
  if (code) {
H
Hongze Cheng 已提交
105
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
106
  }
H
Hongze Cheng 已提交
107
  return code;
H
Hongze Cheng 已提交
108
}
H
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
H
Hongze Cheng 已提交
111
  if (merger->ctx->bData[0].nRow + merger->ctx->bData[1].nRow == 0) return 0;
H
Hongze Cheng 已提交
112 113 114

  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
115 116
  int32_t cidx = merger->ctx->bDataIdx;
  int32_t pidx = (cidx + 1) % 2;
H
Hongze Cheng 已提交
117
  int32_t numRow = (merger->ctx->bData[pidx].nRow + merger->ctx->bData[cidx].nRow) / 2;
H
Hongze Cheng 已提交
118

H
Hongze Cheng 已提交
119
  if (merger->ctx->bData[pidx].nRow > 0 && numRow >= merger->minRow) {
H
Hongze Cheng 已提交
120 121 122 123 124 125 126 127 128 129 130 131 132 133
    ASSERT(merger->ctx->bData[pidx].nRow == merger->maxRow);

    SRowInfo row[1] = {{
        .suid = merger->ctx->tbid->suid,
        .uid = merger->ctx->tbid->uid,
        .row = tsdbRowFromBlockData(merger->ctx->bData + pidx, 0),
    }};

    for (int32_t i = 0; i < numRow; i++) {
      row->row.iRow = i;

      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
134

H
Hongze Cheng 已提交
135
    code = tsdbDataFileFlushTSDataBlock(merger->dataWriter);
H
Hongze Cheng 已提交
136
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149

    for (int32_t i = numRow; i < merger->ctx->bData[pidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    row->row = tsdbRowFromBlockData(merger->ctx->bData + cidx, 0);
    for (int32_t i = 0; i < merger->ctx->bData[cidx].nRow; i++) {
      row->row.iRow = i;
      code = tsdbDataFileWriteTSData(merger->dataWriter, row);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
150
  } else {
H
Hongze Cheng 已提交
151 152 153 154
    if (merger->ctx->bData[pidx].nRow > 0) {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162 163 164 165
    if (merger->ctx->bData[cidx].nRow < merger->minRow) {
      code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + cidx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
    tBlockDataReset(merger->ctx->bData + i);
H
Hongze Cheng 已提交
166 167 168 169
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
170
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
171 172 173
  }
  return code;
}
H
Hongze Cheng 已提交
174 175

static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
H
Hongze Cheng 已提交
176 177 178
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
179
  code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
H
Hongze Cheng 已提交
180
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
181

H
Hongze Cheng 已提交
182
  merger->ctx->bDataIdx = 0;
H
Hongze Cheng 已提交
183
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) {
H
fix bug  
Hongze Cheng 已提交
184
    code = tBlockDataInit(merger->ctx->bData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
H
Hongze Cheng 已提交
185 186
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
187

H
Hongze Cheng 已提交
188 189
_exit:
  if (code) {
H
Hongze Cheng 已提交
190
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
191 192 193 194
  }
  return code;
}

H
Hongze Cheng 已提交
195
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
H
Hongze Cheng 已提交
196 197
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
198

H
Hongze Cheng 已提交
199 200
  for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) {
    if (row->uid != merger->ctx->tbid->uid) {
H
Hongze Cheng 已提交
201
      code = tsdbMergeToDataTableEnd(merger);
H
Hongze Cheng 已提交
202 203
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
204 205 206
      merger->ctx->tbid->suid = row->suid;
      merger->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
207
      code = tsdbMergeToDataTableBegin(merger);
H
Hongze Cheng 已提交
208 209
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
210

H
Hongze Cheng 已提交
211
    TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
H
Hongze Cheng 已提交
212

H
Hongze Cheng 已提交
213 214 215 216 217 218
    if (key->version <= merger->compactVersion                 //
        && merger->ctx->bData[merger->ctx->bDataIdx].nRow > 0  //
        && merger->ctx->bData[merger->ctx->bDataIdx].aTSKEY[merger->ctx->bData[merger->ctx->bDataIdx].nRow - 1] ==
               key->ts) {
      // update
      code = tBlockDataUpdateRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL);
H
Hongze Cheng 已提交
219
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
220 221 222 223 224 225 226 227 228 229 230 231
    } else {
      if (merger->ctx->bData[merger->ctx->bDataIdx].nRow >= merger->maxRow) {
        int32_t idx = (merger->ctx->bDataIdx + 1) % 2;

        code = tsdbDataFileWriteTSDataBlock(merger->dataWriter, merger->ctx->bData + idx);
        TSDB_CHECK_CODE(code, lino, _exit);

        tBlockDataClear(merger->ctx->bData + idx);

        // switch to next bData
        merger->ctx->bDataIdx = idx;
      }
H
Hongze Cheng 已提交
232

H
Hongze Cheng 已提交
233 234
      code = tBlockDataAppendRow(merger->ctx->bData + merger->ctx->bDataIdx, &row->row, NULL, row->uid);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
235
    }
H
Hongze Cheng 已提交
236 237 238

    code = tsdbIterMergerNext(merger->iterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
239 240
  }

H
Hongze Cheng 已提交
241 242 243
  code = tsdbMergeToDataTableEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
244 245
_exit:
  if (code) {
H
Hongze Cheng 已提交
246
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
247 248 249 250 251 252 253 254 255
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
256 257 258
  SRowInfo *row;
  while ((row = tsdbIterMergerGet(merger->iterMerger))) {
    code = tsdbSttFileWriteTSData(merger->sttWriter, row);
H
Hongze Cheng 已提交
259 260
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
261
    code = tsdbIterMergerNext(merger->iterMerger);
H
Hongze Cheng 已提交
262 263 264 265 266 267 268 269 270 271
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
272 273 274
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
275

H
Hongze Cheng 已提交
276 277
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
278 279 280 281 282 283 284 285 286 287

  // TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {

  for (int32_t i = 0;; ++i) {
    if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
      merger->ctx->lvl = NULL;
      break;
    }

    merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
H
Hongze Cheng 已提交
288 289 290
    if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
291
      break;
H
Hongze Cheng 已提交
292
    }
H
Hongze Cheng 已提交
293

H
Hongze Cheng 已提交
294 295 296 297
    ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);

    merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
    if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
H
Hongze Cheng 已提交
298
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
299
      break;
H
Hongze Cheng 已提交
300
    } else {
H
Hongze Cheng 已提交
301
      merger->ctx->level++;
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303
      // add remove operation
H
Hongze Cheng 已提交
304 305 306 307 308 309 310 311
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
          .of = merger->ctx->fobj->f[0],
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
312
      // open the reader
H
Hongze Cheng 已提交
313
      SSttFileReader      *reader;
H
Hongze Cheng 已提交
314
      SSttFileReaderConfig config[1] = {{
H
Hongze Cheng 已提交
315
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
316 317 318
          .szPage = merger->szPage,
          .file[0] = merger->ctx->fobj->f[0],
      }};
H
Hongze Cheng 已提交
319
      code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader);
H
Hongze Cheng 已提交
320
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
321

H
Hongze Cheng 已提交
322
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
323
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
324 325
    }
  }
H
Hongze Cheng 已提交
326

H
Hongze Cheng 已提交
327 328
_exit:
  if (code) {
H
Hongze Cheng 已提交
329
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
    const TSttSegReaderArray *segReaderArr;

H
Hongze Cheng 已提交
343
    code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr);
H
Hongze Cheng 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
    TSDB_CHECK_CODE(code, lino, _exit);

    SSttSegReader *segReader;
    TARRAY2_FOREACH(segReaderArr, segReader) {
      STsdbIter *iter;

      STsdbIterConfig config[1] = {{
          .type = TSDB_ITER_TYPE_STT,
          .sttReader = segReader,
      }};

      code = tsdbIterOpen(config, &iter);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = TARRAY2_APPEND(merger->iterArr, iter);
H
Hongze Cheng 已提交
359
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
360
    }
H
Hongze Cheng 已提交
361
  }
H
Hongze Cheng 已提交
362

H
Hongze Cheng 已提交
363
  code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger);
H
Hongze Cheng 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
378 379
  if (merger->ctx->lvl) {
    // to existing level
H
Hongze Cheng 已提交
380
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
381
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
382 383 384
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
385 386 387 388
        .compactVersion = merger->compactVersion,
        .file = merger->ctx->fobj->f[0],
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
389
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
390 391 392 393 394 395 396 397 398
  } else {
    SDiskID did[1];
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) {
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    // to new level
H
Hongze Cheng 已提交
399
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
400
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
401 402 403
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
404
        .compactVersion = merger->compactVersion,
H
Hongze Cheng 已提交
405
        .file =
H
Hongze Cheng 已提交
406
            {
H
Hongze Cheng 已提交
407
                .type = TSDB_FTYPE_STT,
H
Hongze Cheng 已提交
408
                .did = did[0],
H
Hongze Cheng 已提交
409
                .fid = merger->ctx->fset->fid,
H
Hongze Cheng 已提交
410 411
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
412 413 414 415
                .stt = {{
                    .level = merger->ctx->level,
                    .nseg = 0,
                }},
H
Hongze Cheng 已提交
416
            },
H
Hongze Cheng 已提交
417 418
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
419
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
420 421
  }

H
Hongze Cheng 已提交
422
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
423
    SDiskID did;
H
Hongze Cheng 已提交
424
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
H
Hongze Cheng 已提交
425 426

    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
H
Hongze Cheng 已提交
427 428 429
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
430

H
Hongze Cheng 已提交
431
    SDataFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
432
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
433
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
434
        .maxRow = merger->maxRow,
H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
        .szPage = merger->szPage,
        .fid = merger->ctx->fset->fid,
        .cid = merger->cid,
        .did = did,
        .compactVersion = merger->compactVersion,
    }};

    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
      if (merger->ctx->fset->farr[i]) {
        config->files[i].exist = true;
        config->files[i].file = merger->ctx->fset->farr[i]->f[0];
      } else {
        config->files[i].exist = false;
      }
    }

    code = tsdbDataFileWriterOpen(config, &merger->dataWriter);
H
Hongze Cheng 已提交
452
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
453
  }
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455 456
_exit:
  if (code) {
H
Hongze Cheng 已提交
457
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
458 459 460
  }
  return code;
}
H
Hongze Cheng 已提交
461 462

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
463 464 465
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
466 467 468 469 470 471
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
  ASSERT(TARRAY2_SIZE(merger->iterArr) == 0);
  ASSERT(merger->iterMerger == NULL);
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->dataWriter == NULL);

H
Hongze Cheng 已提交
472 473 474 475 476 477 478
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  merger->ctx->bDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); ++i) {
    tBlockDataReset(merger->ctx->bData + i);
  }

H
Hongze Cheng 已提交
479 480
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
481 482
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
483 484 485 486 487 488 489 490 491 492
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
493
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
494
  }
H
Hongze Cheng 已提交
495 496 497 498 499 500 501 502
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
503
  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
504 505
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
506
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
507
    code = tsdbDataFileWriterClose(&merger->dataWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
508 509 510
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
511 512
_exit:
  if (code) {
H
Hongze Cheng 已提交
513
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
514 515
  }
  return code;
H
Hongze Cheng 已提交
516
}
H
Hongze Cheng 已提交
517 518

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
519
  tsdbIterMergerClose(&merger->iterMerger);
H
Hongze Cheng 已提交
520 521 522 523 524
  TARRAY2_CLEAR(merger->iterArr, tsdbIterClose);
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
525
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
526 527 528 529
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
530 531
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
532
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
533

H
Hongze Cheng 已提交
534 535 536 537 538 539 540 541 542 543 544 545
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
546
  }
H
Hongze Cheng 已提交
547 548
  return code;
}
H
Hongze Cheng 已提交
549

H
Hongze Cheng 已提交
550 551 552
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
553

H
Hongze Cheng 已提交
554
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
555 556 557
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
558
  // do merge
H
Hongze Cheng 已提交
559
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
560
    code = tsdbMergeToDataLevel(merger);
H
Hongze Cheng 已提交
561 562 563 564 565
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
566 567 568 569

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
570 571
_exit:
  if (code) {
H
Hongze Cheng 已提交
572
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
573
  } else {
H
Hongze Cheng 已提交
574
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
575
  }
H
Hongze Cheng 已提交
576
  return 0;
H
Hongze Cheng 已提交
577 578
}

H
Hongze Cheng 已提交
579
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
580
  int32_t code = 0;
H
Hongze Cheng 已提交
581
  int32_t lino = 0;
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583 584
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
H
Hongze Cheng 已提交
585
    SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
H
Hongze Cheng 已提交
586
    if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
H
Hongze Cheng 已提交
587

H
Hongze Cheng 已提交
588
    STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
H
Hongze Cheng 已提交
589
    if (fobj->f->stt->nseg < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
590

H
Hongze Cheng 已提交
591 592 593 594 595
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
596 597
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
598 599
  }

H
Hongze Cheng 已提交
600 601
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
602 603
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
604

H
Hongze Cheng 已提交
605 606
_exit:
  if (code) {
H
Hongze Cheng 已提交
607
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
608
  } else {
H
Hongze Cheng 已提交
609
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623
  }
  return code;
}

int32_t tsdbMerge(STsdb *tsdb) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
624
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
625 626 627 628 629
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
630
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632 633
_exit:
  if (code) {
H
Hongze Cheng 已提交
634 635
    TSDB_ERROR_LOG(vid, lino, code);
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
636
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
637
  }
H
Hongze Cheng 已提交
638
  return code;
H
Hongze Cheng 已提交
639
}