tsdbMerge.c 17.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25 26 27 28
  int32_t sttTrigger;
  int32_t maxRow;
  int32_t minRow;
  int32_t szPage;
  int8_t  cmprAlg;
  int64_t compactVersion;
  int64_t cid;
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30
  // context
H
Hongze Cheng 已提交
31
  struct {
H
Hongze Cheng 已提交
32
    bool       opened;
H
Hongze Cheng 已提交
33
    int64_t    now;
H
Hongze Cheng 已提交
34
    STFileSet *fset;
H
Hongze Cheng 已提交
35 36
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
37
    SSttLvl   *lvl;
H
Hongze Cheng 已提交
38
    TABLEID    tbid[1];
H
Hongze Cheng 已提交
39
  } ctx[1];
H
Hongze Cheng 已提交
40

H
Hongze Cheng 已提交
41 42
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
43
  // reader
H
Hongze Cheng 已提交
44
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
45
  // iter
H
Hongze Cheng 已提交
46 47 48 49
  TTsdbIterArray dataIterArr[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArr[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
50
  // writer
H
Hongze Cheng 已提交
51
  SFSetWriter *writer;
H
Hongze Cheng 已提交
52 53
} SMerger;

H
Hongze Cheng 已提交
54
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
55
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
56 57 58 59 60
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
61
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
62
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
63
  return 0;
H
Hongze Cheng 已提交
64 65
}

H
Hongze Cheng 已提交
66
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
67 68 69
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
70 71

  // edit file system
H
Hongze Cheng 已提交
72
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
73 74
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
75
  taosThreadRwlockWrlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
76
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
77 78 79 80 81
  if (code) {
    taosThreadRwlockUnlock(&merger->tsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosThreadRwlockUnlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
82

H
Hongze Cheng 已提交
83
  ASSERT(merger->writer == NULL);
H
Hongze Cheng 已提交
84 85
  ASSERT(merger->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
H
Hongze Cheng 已提交
86 87
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
88
  // clear the merge
H
Hongze Cheng 已提交
89 90 91
  TARRAY2_DESTROY(merger->dataIterArr, NULL);
  TARRAY2_DESTROY(merger->sttReaderArr, NULL);
  TARRAY2_DESTROY(merger->fopArr, NULL);
H
Hongze Cheng 已提交
92 93 94

_exit:
  if (code) {
H
Hongze Cheng 已提交
95
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
96
  }
H
Hongze Cheng 已提交
97
  return code;
H
Hongze Cheng 已提交
98
}
H
Hongze Cheng 已提交
99

H
Hongze Cheng 已提交
100
#if 0
H
Hongze Cheng 已提交
101
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
H
fix  
Hongze Cheng 已提交
102
  if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0;
H
Hongze Cheng 已提交
103 104 105

  int32_t code = 0;
  int32_t lino = 0;
H
fix  
Hongze Cheng 已提交
106
  int32_t cidx = merger->ctx->blockDataIdx;
H
Hongze Cheng 已提交
107
  int32_t pidx = (cidx + 1) % 2;
H
fix  
Hongze Cheng 已提交
108
  int32_t numRow = (merger->ctx->blockData[pidx].nRow + merger->ctx->blockData[cidx].nRow) / 2;
H
Hongze Cheng 已提交
109

H
fix  
Hongze Cheng 已提交
110 111
  if (merger->ctx->blockData[pidx].nRow > 0 && numRow >= merger->minRow) {
    ASSERT(merger->ctx->blockData[pidx].nRow == merger->maxRow);
H
Hongze Cheng 已提交
112 113 114 115

    SRowInfo row[1] = {{
        .suid = merger->ctx->tbid->suid,
        .uid = merger->ctx->tbid->uid,
H
fix  
Hongze Cheng 已提交
116
        .row = tsdbRowFromBlockData(merger->ctx->blockData + pidx, 0),
H
Hongze Cheng 已提交
117 118 119 120 121
    }};

    for (int32_t i = 0; i < numRow; i++) {
      row->row.iRow = i;

H
Hongze Cheng 已提交
122
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
123 124
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126
    code = tsdbDataFileFlush(merger->dataWriter);
H
Hongze Cheng 已提交
127
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
128

H
fix  
Hongze Cheng 已提交
129
    for (int32_t i = numRow; i < merger->ctx->blockData[pidx].nRow; i++) {
H
Hongze Cheng 已提交
130
      row->row.iRow = i;
H
Hongze Cheng 已提交
131
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
132 133 134
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
fix  
Hongze Cheng 已提交
135 136
    row->row = tsdbRowFromBlockData(merger->ctx->blockData + cidx, 0);
    for (int32_t i = 0; i < merger->ctx->blockData[cidx].nRow; i++) {
H
Hongze Cheng 已提交
137
      row->row.iRow = i;
H
Hongze Cheng 已提交
138
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
139 140
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
141
  } else {
H
fix  
Hongze Cheng 已提交
142 143
    if (merger->ctx->blockData[pidx].nRow > 0) {
      code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
144 145
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
fix  
Hongze Cheng 已提交
146 147
    if (merger->ctx->blockData[cidx].nRow < merger->minRow) {
      code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
148 149
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
H
fix  
Hongze Cheng 已提交
150
      code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
151 152 153 154
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

H
fix  
Hongze Cheng 已提交
155 156
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
    tBlockDataReset(merger->ctx->blockData + i);
H
Hongze Cheng 已提交
157 158 159 160
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
161
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
162 163 164
  }
  return code;
}
H
Hongze Cheng 已提交
165 166

static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
H
Hongze Cheng 已提交
167 168 169
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
170
  code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
H
Hongze Cheng 已提交
171
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
172

H
fix  
Hongze Cheng 已提交
173 174 175
  merger->ctx->blockDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
    code = tBlockDataInit(merger->ctx->blockData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
H
Hongze Cheng 已提交
176 177
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
178

H
Hongze Cheng 已提交
179 180
_exit:
  if (code) {
H
Hongze Cheng 已提交
181
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
182 183 184 185
  }
  return code;
}

H
Hongze Cheng 已提交
186
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
H
Hongze Cheng 已提交
187 188
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
189

H
Hongze Cheng 已提交
190
  // data
H
Hongze Cheng 已提交
191
  for (SRowInfo *row; (row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL;) {
H
Hongze Cheng 已提交
192
    if (row->uid != merger->ctx->tbid->uid) {
H
Hongze Cheng 已提交
193
      code = tsdbMergeToDataTableEnd(merger);
H
Hongze Cheng 已提交
194 195
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
196 197 198
      merger->ctx->tbid->suid = row->suid;
      merger->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
199
      code = tsdbMergeToDataTableBegin(merger);
H
Hongze Cheng 已提交
200 201
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
202

H
Hongze Cheng 已提交
203
    TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
H
Hongze Cheng 已提交
204

H
fix  
Hongze Cheng 已提交
205 206 207 208
    if (key->version <= merger->compactVersion                         //
        && merger->ctx->blockData[merger->ctx->blockDataIdx].nRow > 0  //
        && merger->ctx->blockData[merger->ctx->blockDataIdx]
                   .aTSKEY[merger->ctx->blockData[merger->ctx->blockDataIdx].nRow - 1] == key->ts) {
H
Hongze Cheng 已提交
209
      // update
H
fix  
Hongze Cheng 已提交
210
      code = tBlockDataUpdateRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL);
H
Hongze Cheng 已提交
211
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
212
    } else {
H
fix  
Hongze Cheng 已提交
213 214
      if (merger->ctx->blockData[merger->ctx->blockDataIdx].nRow >= merger->maxRow) {
        int32_t idx = (merger->ctx->blockDataIdx + 1) % 2;
H
Hongze Cheng 已提交
215

H
fix  
Hongze Cheng 已提交
216
        code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + idx);
H
Hongze Cheng 已提交
217 218
        TSDB_CHECK_CODE(code, lino, _exit);

H
fix  
Hongze Cheng 已提交
219
        tBlockDataClear(merger->ctx->blockData + idx);
H
Hongze Cheng 已提交
220 221

        // switch to next bData
H
fix  
Hongze Cheng 已提交
222
        merger->ctx->blockDataIdx = idx;
H
Hongze Cheng 已提交
223
      }
H
Hongze Cheng 已提交
224

H
fix  
Hongze Cheng 已提交
225
      code = tBlockDataAppendRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL, row->uid);
H
Hongze Cheng 已提交
226
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
227
    }
H
Hongze Cheng 已提交
228

H
Hongze Cheng 已提交
229
    code = tsdbIterMergerNext(merger->dataIterMerger);
H
Hongze Cheng 已提交
230
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
231 232
  }

H
Hongze Cheng 已提交
233 234 235
  code = tsdbMergeToDataTableEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  // tomb
  STombRecord *record;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
    if (tsdbSttFileWriterIsOpened(merger->sttWriter)) {
      code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbDataFileWriteTombRecord(merger->dataWriter, record);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbIterMergerNext(merger->tombIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
251 252
_exit:
  if (code) {
H
Hongze Cheng 已提交
253
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
254 255 256 257 258 259 260 261 262
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
263
  // data
H
Hongze Cheng 已提交
264
  SRowInfo *row;
H
Hongze Cheng 已提交
265
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger))) {
H
Hongze Cheng 已提交
266
    code = tsdbSttFileWriteRow(merger->sttWriter, row);
H
Hongze Cheng 已提交
267 268
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
269 270 271 272 273 274 275 276 277 278 279
    code = tsdbIterMergerNext(merger->dataIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // tomb
  STombRecord *record;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
    code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbIterMergerNext(merger->tombIterMerger);
H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
289
#endif
H
Hongze Cheng 已提交
290

H
Hongze Cheng 已提交
291 292 293
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295 296
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
297 298 299 300 301 302 303 304

  for (int32_t i = 0;; ++i) {
    if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
      merger->ctx->lvl = NULL;
      break;
    }

    merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
305 306
    if (merger->ctx->lvl->level != merger->ctx->level ||
        TARRAY2_SIZE(merger->ctx->lvl->fobjArr) + 1 < merger->sttTrigger) {
H
Hongze Cheng 已提交
307 308
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
309
      break;
H
Hongze Cheng 已提交
310
    }
H
Hongze Cheng 已提交
311

312
    merger->ctx->level++;
H
Hongze Cheng 已提交
313

314 315 316 317 318 319
    STFileObj *fobj;
    int32_t    numFile = 0;
    TARRAY2_FOREACH(merger->ctx->lvl->fobjArr, fobj) {
      if (numFile == merger->sttTrigger) {
        break;
      }
H
Hongze Cheng 已提交
320

H
Hongze Cheng 已提交
321 322 323
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
324
          .of = fobj->f[0],
H
Hongze Cheng 已提交
325 326 327 328
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
329
      SSttFileReader      *reader;
330
      SSttFileReaderConfig config = {
H
Hongze Cheng 已提交
331
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
332
          .szPage = merger->szPage,
333 334 335 336
          .file[0] = fobj->f[0],
      };

      code = tsdbSttFileReaderOpen(fobj->fname, &config, &reader);
H
Hongze Cheng 已提交
337
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
338

H
Hongze Cheng 已提交
339
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
340
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
341 342
    }
  }
H
Hongze Cheng 已提交
343

H
Hongze Cheng 已提交
344 345
_exit:
  if (code) {
H
Hongze Cheng 已提交
346
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
358 359
    STsdbIter      *iter;
    STsdbIterConfig config = {0};
H
Hongze Cheng 已提交
360

361 362 363 364 365
    // data iter
    config.type = TSDB_ITER_TYPE_STT;
    config.sttReader = sttReader;

    code = tsdbIterOpen(&config, &iter);
H
Hongze Cheng 已提交
366 367
    TSDB_CHECK_CODE(code, lino, _exit);

368 369
    code = TARRAY2_APPEND(merger->dataIterArr, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
370

371 372 373
    // tomb iter
    config.type = TSDB_ITER_TYPE_STT_TOMB;
    config.sttReader = sttReader;
H
Hongze Cheng 已提交
374

375 376
    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
377

378 379
    code = TARRAY2_APPEND(merger->tombIterArr, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
380
  }
H
Hongze Cheng 已提交
381

H
Hongze Cheng 已提交
382 383 384 385
  code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true);
H
Hongze Cheng 已提交
386 387 388 389 390 391 392 393 394 395 396 397 398 399
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

400 401 402 403
  SDiskID did;
  int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
  if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
    code = TSDB_CODE_FS_NO_VALID_DISK;
H
Hongze Cheng 已提交
404
    TSDB_CHECK_CODE(code, lino, _exit);
405
  }
H
Hongze Cheng 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418
  SFSetWriterConfig config = {
      .tsdb = merger->tsdb,
      .toSttOnly = true,
      .compactVersion = merger->compactVersion,
      .minRow = merger->minRow,
      .maxRow = merger->maxRow,
      .szPage = merger->szPage,
      .cmprAlg = merger->cmprAlg,
      .fid = merger->ctx->fset->fid,
      .cid = merger->cid,
      .did = did,
      .level = merger->ctx->level,
  };
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
421
    config.toSttOnly = false;
H
Hongze Cheng 已提交
422

H
Hongze Cheng 已提交
423 424 425 426
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
      if (merger->ctx->fset->farr[ftype]) {
        config.files[ftype].exist = true;
        config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
H
Hongze Cheng 已提交
427
      } else {
H
Hongze Cheng 已提交
428
        config.files[ftype].exist = false;
H
Hongze Cheng 已提交
429 430
      }
    }
H
Hongze Cheng 已提交
431
  }
H
Hongze Cheng 已提交
432

H
Hongze Cheng 已提交
433 434 435
  code = tsdbFSetWriterOpen(&config, &merger->writer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
436 437
_exit:
  if (code) {
H
Hongze Cheng 已提交
438
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
439 440 441
  }
  return code;
}
H
Hongze Cheng 已提交
442 443

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
444 445 446
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
447
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
H
Hongze Cheng 已提交
448 449
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
  ASSERT(merger->dataIterMerger == NULL);
H
Hongze Cheng 已提交
450
  ASSERT(merger->writer == NULL);
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452 453 454
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;

H
Hongze Cheng 已提交
455 456
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
457 458
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
459 460 461 462 463 464 465 466 467 468
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
469
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
470
  }
H
Hongze Cheng 已提交
471 472 473 474
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
H
Hongze Cheng 已提交
475
  return tsdbFSetWriterClose(&merger->writer, 0, merger->fopArr);
H
Hongze Cheng 已提交
476
}
H
Hongze Cheng 已提交
477 478

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
479 480 481 482
  tsdbIterMergerClose(&merger->tombIterMerger);
  TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
  tsdbIterMergerClose(&merger->dataIterMerger);
  TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
H
Hongze Cheng 已提交
483 484 485 486
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
487
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
488 489 490 491
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
492 493 494
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
495 496 497 498 499 500 501 502 503 504 505
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
506
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
507
  }
H
Hongze Cheng 已提交
508 509
  return code;
}
H
Hongze Cheng 已提交
510

H
Hongze Cheng 已提交
511 512 513
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
514

H
Hongze Cheng 已提交
515
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
516 517 518
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
  // data
  SMetaInfo info;
  SRowInfo *row;
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL) {
    if (row->uid != merger->ctx->tbid->uid) {
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(merger->dataIterMerger, (TABLEID *)row);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }

      merger->ctx->tbid->uid = row->uid;
      merger->ctx->tbid->suid = row->suid;
    }

    code = tsdbFSetWriteRow(merger->writer, row);
H
Hongze Cheng 已提交
537
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561

    code = tsdbIterMergerNext(merger->dataIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // tomb
  STombRecord *record;
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger)) != NULL) {
    if (record->uid != merger->ctx->tbid->uid) {
      merger->ctx->tbid->uid = record->uid;
      merger->ctx->tbid->suid = record->suid;

      if (metaGetInfo(merger->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(merger->tombIterMerger, merger->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }
    code = tsdbFSetWriteTombRecord(merger->writer, record);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbIterMergerNext(merger->tombIterMerger);
H
Hongze Cheng 已提交
562 563
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
564 565 566 567

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
568 569
_exit:
  if (code) {
H
Hongze Cheng 已提交
570
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
571
  } else {
H
Hongze Cheng 已提交
572
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
573
  }
H
Hongze Cheng 已提交
574
  return 0;
H
Hongze Cheng 已提交
575 576
}

H
Hongze Cheng 已提交
577
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
578
  int32_t code = 0;
H
Hongze Cheng 已提交
579
  int32_t lino = 0;
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581 582
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
583 584 585
    if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;

    SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
H
Hongze Cheng 已提交
586

587
    if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
588

H
Hongze Cheng 已提交
589 590 591 592 593
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
594 595
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
596 597
  }

H
Hongze Cheng 已提交
598 599
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
600 601
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603 604
_exit:
  if (code) {
H
Hongze Cheng 已提交
605
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
606
  } else {
H
Hongze Cheng 已提交
607
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
608 609 610 611
  }
  return code;
}

H
Hongze Cheng 已提交
612
int32_t tsdbMerge(void *arg) {
H
Hongze Cheng 已提交
613 614
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
615
  STsdb  *tsdb = (STsdb *)arg;
H
Hongze Cheng 已提交
616 617 618 619 620 621

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
622 623
  ASSERT(merger->sttTrigger > 1);

H
Hongze Cheng 已提交
624
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
625 626 627 628 629
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
630
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632 633
_exit:
  if (code) {
H
Hongze Cheng 已提交
634
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
635
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
636
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
637
  }
H
Hongze Cheng 已提交
638
  return code;
H
Hongze Cheng 已提交
639
}