tsdbMerge.c 9.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28
  STsdb   *tsdb;
  int32_t  maxRow;
  int32_t  minRow;
  int32_t  szPage;
  int8_t   cmprAlg;
  int64_t  compactVersion;
  int64_t  cid;
  SSkmInfo skmTb;
  SSkmInfo skmRow;
  uint8_t *aBuf[5];
H
Hongze Cheng 已提交
29
  // context
H
Hongze Cheng 已提交
30 31 32 33 34 35 36 37
  struct {
    bool       opened;
    bool       toData;
    int32_t    level;
    STFileSet *fset;
    SRowInfo  *row;
    SBlockData bData;
  } ctx[1];
H
Hongze Cheng 已提交
38
  // reader
H
Hongze Cheng 已提交
39
  TARRAY2(SSttFileReader *) sttReaderArr[1];
H
Hongze Cheng 已提交
40
  SDataFileReader *dataReader;
H
Hongze Cheng 已提交
41 42
  TTsdbIterArray   iterArr[1];
  SIterMerger     *iterMerger;
H
Hongze Cheng 已提交
43
  // writer
H
Hongze Cheng 已提交
44 45
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
46 47
  // operations
  TFileOpArray fopArr;
H
Hongze Cheng 已提交
48 49
} SMerger;

H
Hongze Cheng 已提交
50
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
51
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
52
  TARRAY2_INIT(&merger->fopArr);
H
Hongze Cheng 已提交
53
  return 0;
H
Hongze Cheng 已提交
54 55
}

H
Hongze Cheng 已提交
56
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
57
  // TODO
H
Hongze Cheng 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
  int32_t       code = 0;
  int32_t       lino = 0;
  SVnode       *pVnode = merger->tsdb->pVnode;
  int32_t       vid = TD_VID(pVnode);
  STFileSystem *fs = merger->tsdb->pFS;

  // edit file system
  code = tsdbFSEditBegin(fs, &merger->fopArr, TSDB_FEDIT_MERGE);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbFSEditCommit(fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  // clear the merge
  TARRAY2_FREE(&merger->fopArr);

_exit:
  if (code) {
  } else {
  }
H
Hongze Cheng 已提交
78 79
  return 0;
}
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
static int32_t tsdbMergeNextRow(SMerger *merger) {
H
Hongze Cheng 已提交
82 83 84
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
85

H
Hongze Cheng 已提交
86
static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) {
H
Hongze Cheng 已提交
87
  if (merger->ctx->bData.nRow == 0) return 0;
H
Hongze Cheng 已提交
88 89 90 91

  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
92 93
  if (merger->ctx->bData.nRow >= merger->minRow) {
    // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData);
H
Hongze Cheng 已提交
94 95
    // TSDB_CHECK_CODE(code, lino, _exit);
  } else {
H
Hongze Cheng 已提交
96
    code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData);
H
Hongze Cheng 已提交
97 98 99
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
100
  tBlockDataReset(&merger->ctx->bData);
H
Hongze Cheng 已提交
101 102 103 104 105 106 107

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
108 109 110 111 112 113 114 115 116
static int32_t tsdbMergeToData(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  for (;;) {
    code = tsdbMergeNextRow(merger);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
117
    if (!merger->ctx->row) {
H
Hongze Cheng 已提交
118 119 120 121 122
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
      break;
    }

H
Hongze Cheng 已提交
123 124
    if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid,
                           merger->ctx->row->uid)) {
H
Hongze Cheng 已提交
125 126 127
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
128
      code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb);
H
Hongze Cheng 已提交
129 130
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
131
      code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
132 133
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
134

H
Hongze Cheng 已提交
135
    code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid);
H
Hongze Cheng 已提交
136 137
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
138
    if (merger->ctx->bData.nRow >= merger->maxRow) {
H
Hongze Cheng 已提交
139 140
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
    }
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  for (;;) {
    code = tsdbMergeNextRow(merger);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
160
    if (!merger->ctx->row) break;
H
Hongze Cheng 已提交
161

H
Hongze Cheng 已提交
162
    code = tsdbSttFileWriteTSData(merger->sttWriter, merger->ctx->row);
H
Hongze Cheng 已提交
163 164 165 166 167 168 169 170 171 172
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
173 174 175 176
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
  int32_t    code = 0;
  int32_t    lino = 0;
  int32_t    vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
177
  STFileSet *fset = merger->ctx->fset;
H
Hongze Cheng 已提交
178 179 180 181

  // prepare the merger file set
  SSttLvl   *lvl;
  STFileObj *fobj;
H
Hongze Cheng 已提交
182 183
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
184 185

  TARRAY2_FOREACH(&fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
186
    if (lvl->level != merger->ctx->level) {
H
Hongze Cheng 已提交
187 188
      lvl = NULL;
      break;
H
Hongze Cheng 已提交
189
    }
H
Hongze Cheng 已提交
190

H
Hongze Cheng 已提交
191
    fobj = TARRAY2_GET(&lvl->farr, 0);
H
Hongze Cheng 已提交
192
    if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
H
Hongze Cheng 已提交
193
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
194
      break;
H
Hongze Cheng 已提交
195
    } else {
H
Hongze Cheng 已提交
196
      ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1);
H
Hongze Cheng 已提交
197
      merger->ctx->level++;
H
Hongze Cheng 已提交
198

H
Hongze Cheng 已提交
199
      // open the reader
H
Hongze Cheng 已提交
200 201
      SSttFileReader      *reader;
      SSttFileReaderConfig config = {
H
Hongze Cheng 已提交
202
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
203 204
          // TODO
      };
H
Hongze Cheng 已提交
205
      code = tsdbSttFReaderOpen(fobj->fname, &config, &reader);
H
Hongze Cheng 已提交
206
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
209 210 211 212
      TSDB_CHECK_CODE(code, lino, _exit);

      // add the operation
      STFileOp op = {
H
Hongze Cheng 已提交
213
          .fid = fobj->f->fid,
H
Hongze Cheng 已提交
214
          .optype = TSDB_FOP_REMOVE,
H
Hongze Cheng 已提交
215
          .of = fobj->f[0],
H
Hongze Cheng 已提交
216 217 218
      };
      code = TARRAY2_APPEND(&merger->fopArr, op);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
219
    }
H
Hongze Cheng 已提交
220
  }
H
Hongze Cheng 已提交
221

H
Hongze Cheng 已提交
222 223
  // open stt file writer
  if (lvl) {
H
Hongze Cheng 已提交
224
    SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
225
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
226 227 228
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
229 230
        .skmTb = &merger->skmTb,
        .skmRow = &merger->skmRow,
H
Hongze Cheng 已提交
231
        .aBuf = merger->aBuf,
H
Hongze Cheng 已提交
232
        .file = fobj->f[0],
H
Hongze Cheng 已提交
233
    };
H
Hongze Cheng 已提交
234
    code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
H
Hongze Cheng 已提交
235
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
236
  } else {
H
Hongze Cheng 已提交
237
    SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
238
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
239 240 241
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
242 243
        .skmTb = &merger->skmTb,
        .skmRow = &merger->skmRow,
H
Hongze Cheng 已提交
244 245 246 247 248 249 250 251
        .aBuf = merger->aBuf,
        .file =
            (STFile){
                .type = TSDB_FTYPE_STT,
                .did = {.level = 0, .id = 0},
                .fid = fset->fid,
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
252
                .stt = {{.level = merger->ctx->level, .nseg = 0}},
H
Hongze Cheng 已提交
253
            },
H
Hongze Cheng 已提交
254
    };
H
Hongze Cheng 已提交
255
    code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
H
Hongze Cheng 已提交
256
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
257 258
  }

H
Hongze Cheng 已提交
259
  // open data file writer
H
Hongze Cheng 已提交
260
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
261
    SDataFileWriterConfig config = {
H
Hongze Cheng 已提交
262
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
263 264 265 266
        // TODO
    };
    code = tsdbDataFileWriterOpen(&config, &merger->dataWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
267
  }
H
Hongze Cheng 已提交
268

H
Hongze Cheng 已提交
269 270 271 272 273 274 275
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
276 277 278 279 280
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  STFileOp op;
H
Hongze Cheng 已提交
281
  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, &op);
H
Hongze Cheng 已提交
282 283 284 285 286 287 288
  TSDB_CHECK_CODE(code, lino, _exit);

  if (op.optype != TSDB_FOP_NONE) {
    code = TARRAY2_APPEND(&merger->fopArr, op);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
289
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298
    // code = tsdbDataFWriterClose();
    // TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
H
Hongze Cheng 已提交
299 300 301 302 303
}
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
304
  if (merger->ctx->opened == false) {
H
Hongze Cheng 已提交
305 306 307 308
    code = tsdbMergerOpen(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
309
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
310 311 312 313

  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
314
  // do merge
H
Hongze Cheng 已提交
315
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
316 317 318 319 320 321
    code = tsdbMergeToData(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
322 323 324 325

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
326 327
_exit:
  if (code) {
H
Hongze Cheng 已提交
328
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
329
  } else {
H
Hongze Cheng 已提交
330
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
331
  }
H
Hongze Cheng 已提交
332
  return 0;
H
Hongze Cheng 已提交
333 334
}

H
Hongze Cheng 已提交
335
int32_t tsdbMerge(STsdb *tsdb) {
H
Hongze Cheng 已提交
336
  int32_t code = 0;
H
Hongze Cheng 已提交
337 338
  int32_t lino;

H
Hongze Cheng 已提交
339
  int32_t       vid = TD_VID(tsdb->pVnode);
H
Hongze Cheng 已提交
340 341 342
  STFileSystem *fs = tsdb->pFS;
  STFileSet    *fset;
  STFileObj    *fobj;
H
Hongze Cheng 已提交
343
  int32_t       sttTrigger = tsdb->pVnode->config.sttTrigger;
H
Hongze Cheng 已提交
344

H
Hongze Cheng 已提交
345 346 347
  SMerger merger[1];
  merger->tsdb = tsdb;
  merger->ctx->opened = false;
H
Hongze Cheng 已提交
348 349 350 351 352 353 354 355 356 357 358 359

  // loop to merge each file set
  TARRAY2_FOREACH(&fs->cstate, fset) {
    SSttLvl *lvl0 = tsdbTFileSetGetLvl(fset, 0);
    if (lvl0 == NULL) {
      continue;
    }

    ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);

    fobj = TARRAY2_GET(&lvl0->farr, 0);

H
Hongze Cheng 已提交
360
    if (fobj->f->stt->nseg >= sttTrigger) {
H
Hongze Cheng 已提交
361
      code = tsdbMergeFileSet(merger, fset);
H
Hongze Cheng 已提交
362 363
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
364 365
  }

H
Hongze Cheng 已提交
366
  // end the merge
H
Hongze Cheng 已提交
367 368
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
369 370
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
371 372 373

_exit:
  if (code) {
H
Hongze Cheng 已提交
374 375
    TSDB_ERROR_LOG(vid, lino, code);
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
376
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
377 378 379
  }
  return 0;
}