tsdbMerge.c 10.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbMerge.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31
  STsdb        *tsdb;
  TFileSetArray fsetArr[1];
  int32_t       sttTrigger;
  int32_t       maxRow;
  int32_t       minRow;
  int32_t       szPage;
  int8_t        cmprAlg;
  int64_t       compactVersion;
  int64_t       cid;
  SSkmInfo      skmTb;
  SSkmInfo      skmRow;
  uint8_t      *aBuf[5];

H
Hongze Cheng 已提交
32
  // context
H
Hongze Cheng 已提交
33
  struct {
H
Hongze Cheng 已提交
34 35 36
    bool opened;

    STFileSet *fset;
H
Hongze Cheng 已提交
37 38 39 40 41
    bool       toData;
    int32_t    level;
    SRowInfo  *row;
    SBlockData bData;
  } ctx[1];
H
Hongze Cheng 已提交
42

H
Hongze Cheng 已提交
43
  // reader
H
Hongze Cheng 已提交
44 45 46 47 48
  TSttFileReaderArray sttReaderArr[1];
  SDataFileReader    *dataReader;
  TTsdbIterArray      iterArr[1];
  SIterMerger        *iterMerger;
  TFileOpArray        fopArr[1];
H
Hongze Cheng 已提交
49
  // writer
H
Hongze Cheng 已提交
50 51
  SSttFileWriter  *sttWriter;
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
52 53
} SMerger;

H
Hongze Cheng 已提交
54
static int32_t tsdbMergerOpen(SMerger *merger) {
H
Hongze Cheng 已提交
55 56 57 58 59 60
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
  merger->compactVersion = INT64_MAX;
  tsdbFSAllocEid(merger->tsdb->pFS, &merger->cid);
H
Hongze Cheng 已提交
61
  merger->ctx->opened = true;
H
Hongze Cheng 已提交
62
  return 0;
H
Hongze Cheng 已提交
63 64
}

H
Hongze Cheng 已提交
65
static int32_t tsdbMergerClose(SMerger *merger) {
H
Hongze Cheng 已提交
66
  // TODO
H
Hongze Cheng 已提交
67 68 69 70 71 72 73
  int32_t       code = 0;
  int32_t       lino = 0;
  SVnode       *pVnode = merger->tsdb->pVnode;
  int32_t       vid = TD_VID(pVnode);
  STFileSystem *fs = merger->tsdb->pFS;

  // edit file system
H
Hongze Cheng 已提交
74
  code = tsdbFSEditBegin(fs, merger->fopArr, TSDB_FEDIT_MERGE);
H
Hongze Cheng 已提交
75 76 77 78 79 80
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbFSEditCommit(fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  // clear the merge
H
Hongze Cheng 已提交
81
  TARRAY2_FREE(merger->fopArr);
H
Hongze Cheng 已提交
82 83 84 85 86

_exit:
  if (code) {
  } else {
  }
H
Hongze Cheng 已提交
87 88
  return 0;
}
H
Hongze Cheng 已提交
89

H
Hongze Cheng 已提交
90
static int32_t tsdbMergeNextRow(SMerger *merger) {
H
Hongze Cheng 已提交
91 92 93
  // TODO
  return 0;
}
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95
static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) {
H
Hongze Cheng 已提交
96
  if (merger->ctx->bData.nRow == 0) return 0;
H
Hongze Cheng 已提交
97 98 99 100

  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
101 102
  if (merger->ctx->bData.nRow >= merger->minRow) {
    // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData);
H
Hongze Cheng 已提交
103 104
    // TSDB_CHECK_CODE(code, lino, _exit);
  } else {
H
Hongze Cheng 已提交
105
    code = tsdbSttFileWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData);
H
Hongze Cheng 已提交
106 107 108
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
109
  tBlockDataReset(&merger->ctx->bData);
H
Hongze Cheng 已提交
110 111 112 113 114 115 116

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}
H
Hongze Cheng 已提交
117 118 119 120 121 122 123 124 125
static int32_t tsdbMergeToData(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  for (;;) {
    code = tsdbMergeNextRow(merger);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
126
    if (!merger->ctx->row) {
H
Hongze Cheng 已提交
127 128 129 130 131
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
      break;
    }

H
Hongze Cheng 已提交
132 133
    if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid,
                           merger->ctx->row->uid)) {
H
Hongze Cheng 已提交
134 135 136
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
137
      code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb);
H
Hongze Cheng 已提交
138 139
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
140
      code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
141 142
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
143

H
Hongze Cheng 已提交
144
    code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid);
H
Hongze Cheng 已提交
145 146
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
147
    if (merger->ctx->bData.nRow >= merger->maxRow) {
H
Hongze Cheng 已提交
148 149
      code = tsdbMergeToDataWriteTSDataBlock(merger);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    }
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

static int32_t tsdbMergeToUpperLevel(SMerger *merger) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  for (;;) {
    code = tsdbMergeNextRow(merger);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
169
    if (!merger->ctx->row) break;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
    code = tsdbSttFileWriteTSData(merger->sttWriter, merger->ctx->row);
H
Hongze Cheng 已提交
172 173 174 175 176 177 178 179 180 181
    TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
182 183 184 185
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
  int32_t    code = 0;
  int32_t    lino = 0;
  int32_t    vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
186
  STFileSet *fset = merger->ctx->fset;
H
Hongze Cheng 已提交
187 188 189 190

  // prepare the merger file set
  SSttLvl   *lvl;
  STFileObj *fobj;
H
Hongze Cheng 已提交
191 192
  merger->ctx->toData = true;
  merger->ctx->level = 0;
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
H
Hongze Cheng 已提交
195
    if (lvl->level != merger->ctx->level) {
H
Hongze Cheng 已提交
196 197
      lvl = NULL;
      break;
H
Hongze Cheng 已提交
198
    }
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200
    fobj = TARRAY2_GET(lvl->fobjArr, 0);
H
Hongze Cheng 已提交
201
    if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
H
Hongze Cheng 已提交
202
      merger->ctx->toData = false;
H
Hongze Cheng 已提交
203
      break;
H
Hongze Cheng 已提交
204
    } else {
H
Hongze Cheng 已提交
205
      ASSERT(lvl->level == 0 || TARRAY2_SIZE(lvl->fobjArr) == 1);
H
Hongze Cheng 已提交
206
      merger->ctx->level++;
H
Hongze Cheng 已提交
207

H
Hongze Cheng 已提交
208
      // open the reader
H
Hongze Cheng 已提交
209 210
      SSttFileReader      *reader;
      SSttFileReaderConfig config = {
H
Hongze Cheng 已提交
211
          .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
212 213
          // TODO
      };
H
Hongze Cheng 已提交
214
      code = tsdbSttFReaderOpen(fobj->fname, &config, &reader);
H
Hongze Cheng 已提交
215
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
      code = TARRAY2_APPEND(merger->sttReaderArr, reader);
H
Hongze Cheng 已提交
218 219 220 221
      TSDB_CHECK_CODE(code, lino, _exit);

      // add the operation
      STFileOp op = {
H
Hongze Cheng 已提交
222
          .fid = fobj->f->fid,
H
Hongze Cheng 已提交
223
          .optype = TSDB_FOP_REMOVE,
H
Hongze Cheng 已提交
224
          .of = fobj->f[0],
H
Hongze Cheng 已提交
225
      };
H
Hongze Cheng 已提交
226
      code = TARRAY2_APPEND(merger->fopArr, op);
H
Hongze Cheng 已提交
227
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
228
    }
H
Hongze Cheng 已提交
229
  }
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231 232
  // open stt file writer
  if (lvl) {
H
Hongze Cheng 已提交
233
    SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
234
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
235 236 237
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
238 239
        .skmTb = &merger->skmTb,
        .skmRow = &merger->skmRow,
H
Hongze Cheng 已提交
240
        .aBuf = merger->aBuf,
H
Hongze Cheng 已提交
241
        .file = fobj->f[0],
H
Hongze Cheng 已提交
242
    };
H
Hongze Cheng 已提交
243
    code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
H
Hongze Cheng 已提交
244
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
245
  } else {
H
Hongze Cheng 已提交
246
    SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
247
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
248 249 250
        .maxRow = merger->maxRow,
        .szPage = merger->szPage,
        .cmprAlg = merger->cmprAlg,
H
Hongze Cheng 已提交
251 252
        .skmTb = &merger->skmTb,
        .skmRow = &merger->skmRow,
H
Hongze Cheng 已提交
253 254 255 256 257 258 259 260
        .aBuf = merger->aBuf,
        .file =
            (STFile){
                .type = TSDB_FTYPE_STT,
                .did = {.level = 0, .id = 0},
                .fid = fset->fid,
                .cid = merger->cid,
                .size = 0,
H
Hongze Cheng 已提交
261
                .stt = {{.level = merger->ctx->level, .nseg = 0}},
H
Hongze Cheng 已提交
262
            },
H
Hongze Cheng 已提交
263
    };
H
Hongze Cheng 已提交
264
    code = tsdbSttFileWriterOpen(&config, &merger->sttWriter);
H
Hongze Cheng 已提交
265
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
266 267
  }

H
Hongze Cheng 已提交
268
  // open data file writer
H
Hongze Cheng 已提交
269
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
270
    SDataFileWriterConfig config = {
H
Hongze Cheng 已提交
271
        .tsdb = merger->tsdb,
H
Hongze Cheng 已提交
272 273 274 275
        // TODO
    };
    code = tsdbDataFileWriterOpen(&config, &merger->dataWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
276
  }
H
Hongze Cheng 已提交
277

H
Hongze Cheng 已提交
278 279 280 281 282 283 284
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
}
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
H
Hongze Cheng 已提交
285 286 287 288 289
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);

  STFileOp op;
H
Hongze Cheng 已提交
290
  code = tsdbSttFileWriterClose(&merger->sttWriter, 0, &op);
H
Hongze Cheng 已提交
291 292 293
  TSDB_CHECK_CODE(code, lino, _exit);

  if (op.optype != TSDB_FOP_NONE) {
H
Hongze Cheng 已提交
294
    code = TARRAY2_APPEND(merger->fopArr, op);
H
Hongze Cheng 已提交
295 296 297
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
298
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
299 300 301 302 303 304 305 306 307
    // code = tsdbDataFWriterClose();
    // TSDB_CHECK_CODE(code, lino, _exit);
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  }
  return code;
H
Hongze Cheng 已提交
308 309 310 311 312
}
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
313
  if (!merger->ctx->opened) {
H
Hongze Cheng 已提交
314 315 316 317
    code = tsdbMergerOpen(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
318
  merger->ctx->fset = fset;
H
Hongze Cheng 已提交
319 320 321 322

  code = tsdbMergeFileSetBegin(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
323
  // do merge
H
Hongze Cheng 已提交
324
  if (merger->ctx->toData) {
H
Hongze Cheng 已提交
325 326 327 328 329 330
    code = tsdbMergeToData(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbMergeToUpperLevel(merger);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
331 332 333 334

  code = tsdbMergeFileSetEnd(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
335 336
_exit:
  if (code) {
H
Hongze Cheng 已提交
337
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(merger->tsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
338
  } else {
H
Hongze Cheng 已提交
339
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
H
Hongze Cheng 已提交
340
  }
H
Hongze Cheng 已提交
341
  return 0;
H
Hongze Cheng 已提交
342 343
}

H
Hongze Cheng 已提交
344
static int32_t tsdbDoMerge(SMerger *merger) {
H
Hongze Cheng 已提交
345
  int32_t code = 0;
H
Hongze Cheng 已提交
346 347
  int32_t lino = 0;
  int32_t vid = TD_VID(merger->tsdb->pVnode);
H
Hongze Cheng 已提交
348

H
Hongze Cheng 已提交
349 350 351 352 353 354
  STFileSet *fset;
  SSttLvl   *lvl;
  STFileObj *fobj;
  TARRAY2_FOREACH(merger->fsetArr, fset) {
    lvl = TARRAY2_SIZE(fset->lvlArr) ? TARRAY2_FIRST(fset->lvlArr) : NULL;
    if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
H
Hongze Cheng 已提交
355

H
Hongze Cheng 已提交
356 357
    fobj = TARRAY2_FIRST(lvl->fobjArr);
    if (fobj->f->stt->nseg < merger->sttTrigger) continue;
H
Hongze Cheng 已提交
358

H
Hongze Cheng 已提交
359 360
    code = tsdbMergeFileSet(merger, fset);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
361 362
  }

H
Hongze Cheng 已提交
363 364
  if (merger->ctx->opened) {
    code = tsdbMergerClose(merger);
H
Hongze Cheng 已提交
365 366
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
367

H
Hongze Cheng 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  } else {
    tsdbDebug("vgId:%d %s done", vid, __func__);
  }
  return code;
}

int32_t tsdbMerge(STsdb *tsdb) {
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);

  SMerger merger[1] = {{
      .tsdb = tsdb,
      .fsetArr = {TARRAY2_INITIALIZER},
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
  }};

  code = tsdbFSCopySnapshot(tsdb->pFS, merger->fsetArr);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbDoMerge(merger);
  TSDB_CHECK_CODE(code, lino, _exit);

  tsdbFSClearSnapshot(merger->fsetArr);
  TARRAY2_FREE(merger->fsetArr);

H
Hongze Cheng 已提交
397 398
_exit:
  if (code) {
H
Hongze Cheng 已提交
399 400
    TSDB_ERROR_LOG(vid, lino, code);
  } else if (merger->ctx->opened) {
H
Hongze Cheng 已提交
401
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
402 403 404
  }
  return 0;
}