tsdbMerge.c 19.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
21 22 23 24 25 26 27 28 29 30

  int32_t  sttTrigger;
  int32_t  maxRow;
  int32_t  minRow;
  int32_t  szPage;
  int8_t   cmprAlg;
  int64_t  compactVersion;
  int64_t  cid;
  SSkmInfo skmTb[1];
  SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
31

H
Hongze Cheng 已提交
32
  // context
H
Hongze Cheng 已提交
33
  struct {
H
Hongze Cheng 已提交
34
    bool       opened;
H
Hongze Cheng 已提交
35
    int64_t    now;
H
Hongze Cheng 已提交
36
    STFileSet *fset;
H
Hongze Cheng 已提交
37 38
    bool       toData;
    int32_t    level;
H
Hongze Cheng 已提交
39 40
    SSttLvl   *lvl;
    STFileObj *fobj;
H
Hongze Cheng 已提交
41
    TABLEID    tbid[1];
H
fix  
Hongze Cheng 已提交
42 43
    int32_t    blockDataIdx;
    SBlockData blockData[2];
H
Hongze Cheng 已提交
44
  } ctx[1];
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46 47
  TFileOpArray fopArr[1];

H
Hongze Cheng 已提交
48
  // reader
H
Hongze Cheng 已提交
49
  TSttFileReaderArray sttReaderArr[1];
H
Hongze Cheng 已提交
50
  // iter
H
Hongze Cheng 已提交
51 52 53 54
  TTsdbIterArray dataIterArr[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArr[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
55
  // writer
H
Hongze Cheng 已提交
56 57
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
58 59
} SMerger;

H
Hongze Cheng 已提交
60
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
61
  merger->ctx->now = taosGetTimestampMs();
H
Hongze Cheng 已提交
62 63 64 65 66
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
67
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
H
Hongze Cheng 已提交
68
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
69
  return 0;
H
Hongze Cheng 已提交
70 71
}

H
Hongze Cheng 已提交
72
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
73 74 75
  int32_t code = 0;
  int32_t lino = 0;
  SVnode *pVnode = merger->tsdb->pVnode;
H
Hongze Cheng 已提交
76 77

  // edit file system
H
Hongze Cheng 已提交
78
  code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
79 80
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
81
  taosThreadRwlockWrlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
82
  code = tsdbFSEditCommit(merger->tsdb->pFS);
H
Hongze Cheng 已提交
83 84 85 86 87
  if (code) {
    taosThreadRwlockUnlock(&merger->tsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  taosThreadRwlockUnlock(&merger->tsdb->rwLock);
H
Hongze Cheng 已提交
88

H
Hongze Cheng 已提交
89 90
  ASSERT(merger->dataWriter == NULL);
  ASSERT(merger->sttWriter == NULL);
H
Hongze Cheng 已提交
91 92
  ASSERT(merger->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
H
Hongze Cheng 已提交
93 94
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);

H
Hongze Cheng 已提交
95
  // clear the merge
H
Hongze Cheng 已提交
96 97 98
  TARRAY2_DESTROY(merger->dataIterArr, NULL);
  TARRAY2_DESTROY(merger->sttReaderArr, NULL);
  TARRAY2_DESTROY(merger->fopArr, NULL);
H
fix  
Hongze Cheng 已提交
99 100
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
    tBlockDataDestroy(merger->ctx->blockData + i);
H
Hongze Cheng 已提交
101
  }
H
Hongze Cheng 已提交
102
  tDestroyTSchema(merger->skmTb->pTSchema);
H
Hongze Cheng 已提交
103
  tDestroyTSchema(merger->skmRow->pTSchema);
H
Hongze Cheng 已提交
104 105 106

_exit:
  if (code) {
H
Hongze Cheng 已提交
107
    TSDB_ERROR_LOG(TD_VID(pVnode), lino, code);
H
Hongze Cheng 已提交
108
  }
H
Hongze Cheng 已提交
109
  return code;
H
Hongze Cheng 已提交
110
}
H
Hongze Cheng 已提交
111

H
Hongze Cheng 已提交
112
static int32_t tsdbMergeToDataTableEnd(SMerger *merger) {
H
fix  
Hongze Cheng 已提交
113
  if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0;
H
Hongze Cheng 已提交
114 115 116

  int32_t code = 0;
  int32_t lino = 0;
H
fix  
Hongze Cheng 已提交
117
  int32_t cidx = merger->ctx->blockDataIdx;
H
Hongze Cheng 已提交
118
  int32_t pidx = (cidx + 1) % 2;
H
fix  
Hongze Cheng 已提交
119
  int32_t numRow = (merger->ctx->blockData[pidx].nRow + merger->ctx->blockData[cidx].nRow) / 2;
H
Hongze Cheng 已提交
120

H
fix  
Hongze Cheng 已提交
121 122
  if (merger->ctx->blockData[pidx].nRow > 0 && numRow >= merger->minRow) {
    ASSERT(merger->ctx->blockData[pidx].nRow == merger->maxRow);
H
Hongze Cheng 已提交
123 124 125 126

    SRowInfo row[1] = {{
        .suid = merger->ctx->tbid->suid,
        .uid = merger->ctx->tbid->uid,
H
fix  
Hongze Cheng 已提交
127
        .row = tsdbRowFromBlockData(merger->ctx->blockData + pidx, 0),
H
Hongze Cheng 已提交
128 129 130 131 132
    }};

    for (int32_t i = 0; i < numRow; i++) {
      row->row.iRow = i;

H
Hongze Cheng 已提交
133
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
134 135
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
136

H
Hongze Cheng 已提交
137
    code = tsdbDataFileFlush(merger->dataWriter);
H
Hongze Cheng 已提交
138
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
139

H
fix  
Hongze Cheng 已提交
140
    for (int32_t i = numRow; i < merger->ctx->blockData[pidx].nRow; i++) {
H
Hongze Cheng 已提交
141
      row->row.iRow = i;
H
Hongze Cheng 已提交
142
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
143 144 145
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
fix  
Hongze Cheng 已提交
146 147
    row->row = tsdbRowFromBlockData(merger->ctx->blockData + cidx, 0);
    for (int32_t i = 0; i < merger->ctx->blockData[cidx].nRow; i++) {
H
Hongze Cheng 已提交
148
      row->row.iRow = i;
H
Hongze Cheng 已提交
149
      code = tsdbDataFileWriteRow(merger->dataWriter, row);
H
Hongze Cheng 已提交
150 151
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
152
  } else {
H
fix  
Hongze Cheng 已提交
153 154
    if (merger->ctx->blockData[pidx].nRow > 0) {
      code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
155 156
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
fix  
Hongze Cheng 已提交
157 158
    if (merger->ctx->blockData[cidx].nRow < merger->minRow) {
      code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
159 160
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
H
fix  
Hongze Cheng 已提交
161
      code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx);
H
Hongze Cheng 已提交
162 163 164 165
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

H
fix  
Hongze Cheng 已提交
166 167
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
    tBlockDataReset(merger->ctx->blockData + i);
H
Hongze Cheng 已提交
168 169 170 171
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
172
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
173 174 175
  }
  return code;
}
H
Hongze Cheng 已提交
176 177

static int32_t tsdbMergeToDataTableBegin(SMerger *merger) {
H
Hongze Cheng 已提交
178 179 180
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
181
  code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb);
H
Hongze Cheng 已提交
182
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
183

H
fix  
Hongze Cheng 已提交
184 185 186
  merger->ctx->blockDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) {
    code = tBlockDataInit(merger->ctx->blockData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0);
H
Hongze Cheng 已提交
187 188
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
189

H
Hongze Cheng 已提交
190 191
_exit:
  if (code) {
H
Hongze Cheng 已提交
192
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
193 194 195 196
  }
  return code;
}

H
Hongze Cheng 已提交
197
static int32_t tsdbMergeToDataLevel(SMerger *merger) {
H
Hongze Cheng 已提交
198 199
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
200

H
Hongze Cheng 已提交
201
  // data
H
Hongze Cheng 已提交
202
  for (SRowInfo *row; (row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL;) {
H
Hongze Cheng 已提交
203
    if (row->uid != merger->ctx->tbid->uid) {
H
Hongze Cheng 已提交
204
      code = tsdbMergeToDataTableEnd(merger);
H
Hongze Cheng 已提交
205 206
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
207 208 209
      merger->ctx->tbid->suid = row->suid;
      merger->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
210
      code = tsdbMergeToDataTableBegin(merger);
H
Hongze Cheng 已提交
211 212
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
    TSDBKEY key[1] = {TSDBROW_KEY(&row->row)};
H
Hongze Cheng 已提交
215

H
fix  
Hongze Cheng 已提交
216 217 218 219
    if (key->version <= merger->compactVersion                         //
        && merger->ctx->blockData[merger->ctx->blockDataIdx].nRow > 0  //
        && merger->ctx->blockData[merger->ctx->blockDataIdx]
                   .aTSKEY[merger->ctx->blockData[merger->ctx->blockDataIdx].nRow - 1] == key->ts) {
H
Hongze Cheng 已提交
220
      // update
H
fix  
Hongze Cheng 已提交
221
      code = tBlockDataUpdateRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL);
H
Hongze Cheng 已提交
222
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
223
    } else {
H
fix  
Hongze Cheng 已提交
224 225
      if (merger->ctx->blockData[merger->ctx->blockDataIdx].nRow >= merger->maxRow) {
        int32_t idx = (merger->ctx->blockDataIdx + 1) % 2;
H
Hongze Cheng 已提交
226

H
fix  
Hongze Cheng 已提交
227
        code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + idx);
H
Hongze Cheng 已提交
228 229
        TSDB_CHECK_CODE(code, lino, _exit);

H
fix  
Hongze Cheng 已提交
230
        tBlockDataClear(merger->ctx->blockData + idx);
H
Hongze Cheng 已提交
231 232

        // switch to next bData
H
fix  
Hongze Cheng 已提交
233
        merger->ctx->blockDataIdx = idx;
H
Hongze Cheng 已提交
234
      }
H
Hongze Cheng 已提交
235

H
fix  
Hongze Cheng 已提交
236
      code = tBlockDataAppendRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL, row->uid);
H
Hongze Cheng 已提交
237
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
238
    }
H
Hongze Cheng 已提交
239

H
Hongze Cheng 已提交
240
    code = tsdbIterMergerNext(merger->dataIterMerger);
H
Hongze Cheng 已提交
241
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
242 243
  }

H
Hongze Cheng 已提交
244 245 246
  code = tsdbMergeToDataTableEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
  // tomb
  STombRecord *record;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
    if (tsdbSttFileWriterIsOpened(merger->sttWriter)) {
      code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbDataFileWriteTombRecord(merger->dataWriter, record);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbIterMergerNext(merger->tombIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
262 263
_exit:
  if (code) {
H
Hongze Cheng 已提交
264
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
265 266 267 268 269 270 271 272 273
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
274
  // data
H
Hongze Cheng 已提交
275
  SRowInfo *row;
H
Hongze Cheng 已提交
276
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger))) {
H
Hongze Cheng 已提交
277
    code = tsdbSttFileWriteRow(merger->sttWriter, row);
H
Hongze Cheng 已提交
278 279
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288 289 290
    code = tsdbIterMergerNext(merger->dataIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  // tomb
  STombRecord *record;
  while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) {
    code = tsdbSttFileWriteTombRecord(merger->sttWriter, record);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbIterMergerNext(merger->tombIterMerger);
H
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299 300
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
301 302 303
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
304

H
Hongze Cheng 已提交
305 306
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
307 308 309 310 311 312 313 314 315 316

  // TARRAY2_FOREACH(merger->ctx->fset->lvlArr, merger->ctx->lvl) {

  for (int32_t i = 0;; ++i) {
    if (i >= TARRAY2_SIZE(merger->ctx->fset->lvlArr)) {
      merger->ctx->lvl = NULL;
      break;
    }

    merger->ctx->lvl = TARRAY2_GET(merger->ctx->fset->lvlArr, i);
H
Hongze Cheng 已提交
317 318 319
    if (merger->ctx->lvl->level != merger->ctx->level || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 0) {
      merger->ctx->toData = false;
      merger->ctx->lvl = NULL;
H
Hongze Cheng 已提交
320
      break;
H
Hongze Cheng 已提交
321
    }
H
Hongze Cheng 已提交
322

H
Hongze Cheng 已提交
323 324 325 326
    ASSERT(merger->ctx->lvl->level == 0 || TARRAY2_SIZE(merger->ctx->lvl->fobjArr) == 1);

    merger->ctx->fobj = TARRAY2_FIRST(merger->ctx->lvl->fobjArr);
    if (merger->ctx->fobj->f->stt->nseg < merger->sttTrigger) {
H
Hongze Cheng 已提交
327
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
328
      break;
H
Hongze Cheng 已提交
329
    } else {
H
Hongze Cheng 已提交
330
      merger->ctx->level++;
H
Hongze Cheng 已提交
331

H
Hongze Cheng 已提交
332
      // add remove operation
H
Hongze Cheng 已提交
333 334 335 336 337 338 339 340
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = merger->ctx->fset->fid,
          .of = merger->ctx->fobj->f[0],
      };
      code = TARRAY2_APPEND(merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
341
      // open the reader
H
Hongze Cheng 已提交
342
      SSttFileReader      *reader;
H
Hongze Cheng 已提交
343
      SSttFileReaderConfig config[1] = {{
H
Hongze Cheng 已提交
344
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
345 346 347
          .szPage = merger->szPage,
          .file[0] = merger->ctx->fobj->f[0],
      }};
H
Hongze Cheng 已提交
348
      code = tsdbSttFileReaderOpen(merger->ctx->fobj->fname, config, &reader);
H
Hongze Cheng 已提交
349
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
350

H
Hongze Cheng 已提交
351
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
352
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
353 354
    }
  }
H
Hongze Cheng 已提交
355

H
Hongze Cheng 已提交
356 357
_exit:
  if (code) {
H
Hongze Cheng 已提交
358
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
359 360 361 362 363 364 365 366 367 368 369 370 371
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  SSttFileReader *sttReader;
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
    const TSttSegReaderArray *segReaderArr;

H
Hongze Cheng 已提交
372
    code = tsdbSttFileReaderGetSegReader(sttReader, &segReaderArr);
H
Hongze Cheng 已提交
373 374 375 376 377 378 379 380 381 382 383
    TSDB_CHECK_CODE(code, lino, _exit);

    SSttSegReader *segReader;
    TARRAY2_FOREACH(segReaderArr, segReader) {
      STsdbIter *iter;

      STsdbIterConfig config[1] = {{
          .type = TSDB_ITER_TYPE_STT,
          .sttReader = segReader,
      }};

H
Hongze Cheng 已提交
384
      // data iter
H
Hongze Cheng 已提交
385 386
      code = tsdbIterOpen(config, &iter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
387 388
      code = TARRAY2_APPEND(merger->dataIterArr, iter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
389

H
Hongze Cheng 已提交
390 391 392 393 394
      // tomb iter
      config->type = TSDB_ITER_TYPE_STT_TOMB;
      code = tsdbIterOpen(config, &iter);
      TSDB_CHECK_CODE(code, lino, _exit);
      code = TARRAY2_APPEND(merger->tombIterArr, iter);
H
Hongze Cheng 已提交
395
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
396
    }
H
Hongze Cheng 已提交
397
  }
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399 400 401 402
  code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true);
H
Hongze Cheng 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  }
  return code;
}

static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
417 418
  if (merger->ctx->lvl) {
    // to existing level
H
Hongze Cheng 已提交
419
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
420
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
421 422 423
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
424 425 426 427
        .compactVersion = merger->compactVersion,
        .file = merger->ctx->fobj->f[0],
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
428
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
429 430 431 432 433 434 435 436 437
  } else {
    SDiskID did[1];
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, did) < 0) {
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    // to new level
H
Hongze Cheng 已提交
438
    SSttFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
439
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
440 441 442
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
443
        .compactVersion = merger->compactVersion,
H
Hongze Cheng 已提交
444
        .file =
H
Hongze Cheng 已提交
445
            {
H
Hongze Cheng 已提交
446
                .type = TSDB_FTYPE_STT,
H
Hongze Cheng 已提交
447
                .did = did[0],
H
Hongze Cheng 已提交
448
                .fid = merger->ctx->fset->fid,
H
Hongze Cheng 已提交
449 450
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
451 452 453 454
                .stt = {{
                    .level = merger->ctx->level,
                    .nseg = 0,
                }},
H
Hongze Cheng 已提交
455
            },
H
Hongze Cheng 已提交
456 457
    }};
    code = tsdbSttFileWriterOpen(config, &merger->sttWriter);
H
Hongze Cheng 已提交
458
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
459 460
  }

H
Hongze Cheng 已提交
461
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
462
    SDiskID did;
H
Hongze Cheng 已提交
463
    int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
H
Hongze Cheng 已提交
464 465

    if (tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did) < 0) {
H
Hongze Cheng 已提交
466 467 468
      code = TSDB_CODE_FS_NO_VALID_DISK;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
    SDataFileWriterConfig config[1] = {{
H
Hongze Cheng 已提交
471
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
472
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
473
        .maxRow = merger->maxRow,
H
Hongze Cheng 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
        .szPage = merger->szPage,
        .fid = merger->ctx->fset->fid,
        .cid = merger->cid,
        .did = did,
        .compactVersion = merger->compactVersion,
    }};

    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
      if (merger->ctx->fset->farr[i]) {
        config->files[i].exist = true;
        config->files[i].file = merger->ctx->fset->farr[i]->f[0];
      } else {
        config->files[i].exist = false;
      }
    }

    code = tsdbDataFileWriterOpen(config, &merger->dataWriter);
H
Hongze Cheng 已提交
491
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
492
  }
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494 495
_exit:
  if (code) {
H
Hongze Cheng 已提交
496
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
497 498 499
  }
  return code;
}
H
Hongze Cheng 已提交
500 501

static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
H
Hongze Cheng 已提交
502 503 504
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
505
  ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0);
H
Hongze Cheng 已提交
506 507
  ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0);
  ASSERT(merger->dataIterMerger == NULL);
H
Hongze Cheng 已提交
508 509 510
  ASSERT(merger->sttWriter == NULL);
  ASSERT(merger->dataWriter == NULL);

H
Hongze Cheng 已提交
511 512
  merger->ctx->tbid->suid = 0;
  merger->ctx->tbid->uid = 0;
H
fix  
Hongze Cheng 已提交
513 514 515
  merger->ctx->blockDataIdx = 0;
  for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); ++i) {
    tBlockDataReset(merger->ctx->blockData + i);
H
Hongze Cheng 已提交
516 517
  }

H
Hongze Cheng 已提交
518 519
  // open reader
  code = tsdbMergeFileSetBeginOpenReader(merger);
H
Hongze Cheng 已提交
520 521
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
522 523 524 525 526 527 528 529 530 531
  // open iterator
  code = tsdbMergeFileSetBeginOpenIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  // open writer
  code = tsdbMergeFileSetBeginOpenWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
532
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
533
  }
H
Hongze Cheng 已提交
534 535 536 537 538 539 540 541
  return code;
}

static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

H
Hongze Cheng 已提交
542
  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
543 544
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
545
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
546
    code = tsdbDataFileWriterClose(&merger->dataWriter, 0, merger->fopArr);
H
Hongze Cheng 已提交
547 548 549
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
550 551
_exit:
  if (code) {
H
Hongze Cheng 已提交
552
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
553 554
  }
  return code;
H
Hongze Cheng 已提交
555
}
H
Hongze Cheng 已提交
556 557

static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
H
Hongze Cheng 已提交
558 559 560 561
  tsdbIterMergerClose(&merger->tombIterMerger);
  TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
  tsdbIterMergerClose(&merger->dataIterMerger);
  TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
H
Hongze Cheng 已提交
562 563 564 565
  return 0;
}

static int32_t tsdbMergeFileSetEndCloseReader(SMerger *merger) {
H
Hongze Cheng 已提交
566
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
H
Hongze Cheng 已提交
567 568 569 570
  return 0;
}

static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
571 572 573
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
574 575 576 577 578 579 580 581 582 583 584
  code = tsdbMergeFileSetEndCloseWriter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseIter(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbMergeFileSetEndCloseReader(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
585
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
586
  }
H
Hongze Cheng 已提交
587 588
  return code;
}
H
Hongze Cheng 已提交
589

H
Hongze Cheng 已提交
590 591 592
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
593

H
Hongze Cheng 已提交
594
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
595 596 597
  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
598
  // do merge
H
Hongze Cheng 已提交
599
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
600
    code = tsdbMergeToDataLevel(merger);
H
Hongze Cheng 已提交
601 602 603 604 605
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
606 607 608 609

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
610 611
_exit:
  if (code) {
H
Hongze Cheng 已提交
612
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
613
  } else {
H
Hongze Cheng 已提交
614
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
615
  }
H
Hongze Cheng 已提交
616
  return 0;
H
Hongze Cheng 已提交
617 618
}

H
Hongze Cheng 已提交
619
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
620
  int32_t code = 0;
H
Hongze Cheng 已提交
621
  int32_t lino = 0;
H
Hongze Cheng 已提交
622

H
Hongze Cheng 已提交
623 624
  STFileSet *fset;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
H
Hongze Cheng 已提交
625
    SSttLvl *lvl = TARRAY2_SIZE(fset->lvlArr) > 0 ? TARRAY2_FIRST(fset->lvlArr) : NULL;
H
Hongze Cheng 已提交
626
    if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
H
Hongze Cheng 已提交
627

H
Hongze Cheng 已提交
628
    STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
H
Hongze Cheng 已提交
629
    if (fobj->f->stt->nseg < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631 632 633 634 635
    if (!merger->ctx->opened) {
      code = tsdbMergerOpen(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
636 637
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
638 639
  }

H
Hongze Cheng 已提交
640 641
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
642 643
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
644

H
Hongze Cheng 已提交
645 646
_exit:
  if (code) {
H
Hongze Cheng 已提交
647
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
648
  } else {
H
Hongze Cheng 已提交
649
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
650 651 652 653
  }
  return code;
}

H
Hongze Cheng 已提交
654
int32_t tsdbMerge(void *arg) {
H
Hongze Cheng 已提交
655 656
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
657
  STsdb  *tsdb = (STsdb *)arg;
H
Hongze Cheng 已提交
658 659 660 661 662 663

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

H
Hongze Cheng 已提交
664 665
  ASSERT(merger->sttTrigger > 1);

H
Hongze Cheng 已提交
666
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
H
Hongze Cheng 已提交
667 668 669 670 671
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
672
  tsdbFSDestroyCopySnapshot(&merger->fsetArr);
H
Hongze Cheng 已提交
673

H
Hongze Cheng 已提交
674 675
_exit:
  if (code) {
H
Hongze Cheng 已提交
676
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
677
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
678
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
679
  }
H
Hongze Cheng 已提交
680
  return code;
H
Hongze Cheng 已提交
681
}