tsdbCommit2.c 16.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbCommit2.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
22 23
  TFileOpArray   fopArray[1];

H
Hongze Cheng 已提交
24 25
  // SSkmInfo skmTb[1];
  // SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28 29 30 31
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32 33
  int32_t sttTrigger;
  int32_t szPage;
H
Hongze Cheng 已提交
34 35 36
  int64_t compactVersion;

  struct {
H
Hongze Cheng 已提交
37
    int64_t    cid;
H
Hongze Cheng 已提交
38 39 40 41
    int64_t    now;
    TSKEY      nextKey;
    int32_t    fid;
    int32_t    expLevel;
H
Hongze Cheng 已提交
42
    SDiskID    did;
H
Hongze Cheng 已提交
43 44 45 46
    TSKEY      minKey;
    TSKEY      maxKey;
    STFileSet *fset;
    TABLEID    tbid[1];
H
Hongze Cheng 已提交
47
    bool       hasTSData;
H
Hongze Cheng 已提交
48 49
  } ctx[1];

50
  // reader
H
Hongze Cheng 已提交
51
  SSttFileReader *sttReader;
52 53 54 55 56 57

  // iter
  TTsdbIterArray dataIterArray[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArray[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59
  // writer
H
Hongze Cheng 已提交
60
  SFSetWriter *writer;
H
Hongze Cheng 已提交
61
} SCommitter2;
H
Hongze Cheng 已提交
62

63
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
64
  int32_t code = 0;
H
Hongze Cheng 已提交
65
  int32_t lino = 0;
H
Hongze Cheng 已提交
66

H
Hongze Cheng 已提交
67
  SFSetWriterConfig config = {
H
Hongze Cheng 已提交
68
      .tsdb = committer->tsdb,
H
Hongze Cheng 已提交
69 70 71
      .toSttOnly = true,
      .compactVersion = committer->compactVersion,
      .minRow = committer->minRow,
H
Hongze Cheng 已提交
72
      .maxRow = committer->maxRow,
H
Hongze Cheng 已提交
73
      .szPage = committer->szPage,
H
Hongze Cheng 已提交
74
      .cmprAlg = committer->cmprAlg,
H
Hongze Cheng 已提交
75 76
      .fid = committer->ctx->fid,
      .cid = committer->ctx->cid,
H
Hongze Cheng 已提交
77
      .did = committer->ctx->did,
H
Hongze Cheng 已提交
78
      .level = 0,
H
Hongze Cheng 已提交
79
  };
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
  if (committer->sttTrigger == 1) {
H
Hongze Cheng 已提交
82
    config.toSttOnly = false;
H
Hongze Cheng 已提交
83 84 85 86 87 88 89 90 91

    if (committer->ctx->fset) {
      for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
        if (committer->ctx->fset->farr[ftype] != NULL) {
          config.files[ftype].exist = true;
          config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
        }
      }
    }
H
Hongze Cheng 已提交
92
  }
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94 95 96
  code = tsdbFSetWriterOpen(&config, &committer->writer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
97 98 99
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
100 101
  }
  return code;
H
Hongze Cheng 已提交
102 103
}

104
static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
105
  return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray);
H
Hongze Cheng 已提交
106 107 108
}

static int32_t tsdbCommitTSData(SCommitter2 *committer) {
H
Hongze Cheng 已提交
109 110 111
  int32_t   code = 0;
  int32_t   lino = 0;
  SMetaInfo info;
H
Hongze Cheng 已提交
112

H
Hongze Cheng 已提交
113 114
  committer->ctx->hasTSData = false;

115 116
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
117 118
  for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
    if (row->uid != committer->ctx->tbid->uid) {
H
fix bug  
Hongze Cheng 已提交
119 120 121
      committer->ctx->tbid->suid = row->suid;
      committer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
122
      if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
H
fix bug  
Hongze Cheng 已提交
123
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
124 125 126 127 128
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
129 130 131
    int64_t ts = TSDBROW_TS(&row->row);
    if (ts > committer->ctx->maxKey) {
      committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
H
fix bug  
Hongze Cheng 已提交
132
      code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
133 134 135 136
      TSDB_CHECK_CODE(code, lino, _exit);
      continue;
    }

H
Hongze Cheng 已提交
137 138
    committer->ctx->hasTSData = true;

H
Hongze Cheng 已提交
139
    code = tsdbFSetWriteRow(committer->writer, row);
H
Hongze Cheng 已提交
140
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
141 142

    code = tsdbIterMergerNext(committer->dataIterMerger);
H
Hongze Cheng 已提交
143 144 145
    TSDB_CHECK_CODE(code, lino, _exit);
  }

146 147 148 149 150 151 152 153
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
  }
  return code;
}

static int32_t tsdbCommitTombData(SCommitter2 *committer) {
154 155 156
  int32_t   code = 0;
  int32_t   lino = 0;
  SMetaInfo info;
157

H
Hongze Cheng 已提交
158 159 160 161
  if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
    return 0;
  }

162 163
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
164
  for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
165 166 167 168 169 170 171 172 173 174 175
    if (record->uid != committer->ctx->tbid->uid) {
      committer->ctx->tbid->suid = record->suid;
      committer->ctx->tbid->uid = record->uid;

      if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
176
    if (record->ekey < committer->ctx->minKey) {
H
Hongze Cheng 已提交
177
      goto _next;
H
Hongze Cheng 已提交
178 179
    } else if (record->skey > committer->ctx->maxKey) {
      committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey);
H
Hongze Cheng 已提交
180
      goto _next;
H
Hongze Cheng 已提交
181 182 183 184 185 186 187 188 189
    }

    if (record->ekey > committer->ctx->maxKey) {
      committer->ctx->maxKey = committer->ctx->maxKey + 1;
    }

    record->skey = TMAX(record->skey, committer->ctx->minKey);
    record->ekey = TMIN(record->ekey, committer->ctx->maxKey);

H
Hongze Cheng 已提交
190 191
    code = tsdbFSetWriteTombRecord(committer->writer, record);
    TSDB_CHECK_CODE(code, lino, _exit);
192

H
Hongze Cheng 已提交
193
  _next:
H
Hongze Cheng 已提交
194 195
    code = tsdbIterMergerNext(committer->tombIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
196
  }
H
Hongze Cheng 已提交
197 198 199 200

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
201 202 203 204
  }
  return code;
}

205
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
H
Hongze Cheng 已提交
206 207 208
  int32_t code = 0;
  int32_t lino = 0;

209
  ASSERT(committer->sttReader == NULL);
H
Hongze Cheng 已提交
210

211 212 213 214 215 216
  if (committer->ctx->fset == NULL                        //
      || committer->sttTrigger > 1                        //
      || TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0  //
  ) {
    return 0;
  }
H
Hongze Cheng 已提交
217

218
  ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1);
H
Hongze Cheng 已提交
219

220
  SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr);
H
Hongze Cheng 已提交
221

222
  ASSERT(lvl->level == 0);
H
Hongze Cheng 已提交
223

224 225
  if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
    return 0;
H
Hongze Cheng 已提交
226 227
  }

228
  ASSERT(TARRAY2_SIZE(lvl->fobjArr) == 1);
H
Hongze Cheng 已提交
229

230
  STFileObj *fobj = TARRAY2_FIRST(lvl->fobjArr);
H
Hongze Cheng 已提交
231

232 233 234 235 236 237
  SSttFileReaderConfig config = {
      .tsdb = committer->tsdb,
      .szPage = committer->szPage,
      .file = fobj->f[0],
  };
  code = tsdbSttFileReaderOpen(fobj->fname, &config, &committer->sttReader);
H
Hongze Cheng 已提交
238 239
  TSDB_CHECK_CODE(code, lino, _exit);

240 241 242 243 244 245 246
  STFileOp op = {
      .optype = TSDB_FOP_REMOVE,
      .fid = fobj->f->fid,
      .of = fobj->f[0],
  };

  code = TARRAY2_APPEND(committer->fopArray, op);
H
Hongze Cheng 已提交
247 248 249 250 251 252 253 254 255
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
  }
  return code;
}

256
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { return tsdbSttFileReaderClose(&committer->sttReader); }
H
Hongze Cheng 已提交
257

258
static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
259 260 261
  int32_t code = 0;
  int32_t lino = 0;

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(committer->tombIterArray) == 0);
  ASSERT(committer->tombIterMerger == NULL);

  STsdbIter      *iter;
  STsdbIterConfig config = {0};

  // mem data iter
  config.type = TSDB_ITER_TYPE_MEMT;
  config.memt = committer->tsdb->imem;
  config.from->ts = committer->ctx->minKey;
  config.from->version = VERSION_MIN;

  code = tsdbIterOpen(&config, &iter);
H
Hongze Cheng 已提交
277 278
  TSDB_CHECK_CODE(code, lino, _exit);

279 280
  code = TARRAY2_APPEND(committer->dataIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
281

282 283 284
  // mem tomb iter
  config.type = TSDB_ITER_TYPE_MEMT_TOMB;
  config.memt = committer->tsdb->imem;
H
Hongze Cheng 已提交
285

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  code = tsdbIterOpen(&config, &iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = TARRAY2_APPEND(committer->tombIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  // STT
  if (committer->sttReader) {
    // data iter
    config.type = TSDB_ITER_TYPE_STT;
    config.sttReader = committer->sttReader;

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->dataIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    // tomb iter
    config.type = TSDB_ITER_TYPE_STT_TOMB;
    config.sttReader = committer->sttReader;

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->tombIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
313 314
  }

315 316 317 318 319
  // open merger
  code = tsdbIterMergerOpen(committer->dataIterArray, &committer->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(committer->tombIterArray, &committer->tombIterMerger, true);
H
Hongze Cheng 已提交
320 321
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
322 323
_exit:
  if (code) {
H
Hongze Cheng 已提交
324
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
325 326 327 328
  }
  return code;
}

329 330 331 332 333 334 335 336
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
  tsdbIterMergerClose(&committer->tombIterMerger);
  tsdbIterMergerClose(&committer->dataIterMerger);
  TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
  TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
  return 0;
}

H
Hongze Cheng 已提交
337 338 339 340 341 342
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *tsdb = committer->tsdb;

  committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
H
Hongze Cheng 已提交
343
  committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
H
Hongze Cheng 已提交
344 345
  tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
                  &committer->ctx->maxKey);
H
Hongze Cheng 已提交
346 347
  code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
348
  tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
H
Hongze Cheng 已提交
349 350
  STFileSet fset = {.fid = committer->ctx->fid};
  committer->ctx->fset = &fset;
H
Hongze Cheng 已提交
351 352
  STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
  committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
H
Hongze Cheng 已提交
353 354
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
355

356 357
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
H
Hongze Cheng 已提交
358
  ASSERT(committer->writer == NULL);
H
Hongze Cheng 已提交
359

360 361 362 363 364 365
  code = tsdbCommitOpenReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbCommitOpenIter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
366 367
  code = tsdbCommitOpenWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370 371
  // reset nextKey
  committer->ctx->nextKey = TSKEY_MAX;

H
Hongze Cheng 已提交
372 373
_exit:
  if (code) {
H
Hongze Cheng 已提交
374
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
375
  } else {
H
Hongze Cheng 已提交
376 377
    tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
              __func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
H
Hongze Cheng 已提交
378
  }
H
Hongze Cheng 已提交
379
  return 0;
H
Hongze Cheng 已提交
380 381
}

H
Hongze Cheng 已提交
382
static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
H
Hongze Cheng 已提交
383
  int32_t code = 0;
H
Hongze Cheng 已提交
384
  int32_t lino = 0;
H
Hongze Cheng 已提交
385

386 387
  code = tsdbCommitCloseWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
388

389
  code = tsdbCommitCloseIter(committer);
H
Hongze Cheng 已提交
390
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
391

392 393
  code = tsdbCommitCloseReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
394

H
Hongze Cheng 已提交
395 396
_exit:
  if (code) {
H
Hongze Cheng 已提交
397
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
398
  } else {
H
Hongze Cheng 已提交
399
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
400 401 402 403
  }
  return code;
}

H
Hongze Cheng 已提交
404
static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
H
Hongze Cheng 已提交
405 406 407
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
408
  // fset commit start
H
Hongze Cheng 已提交
409
  code = tsdbCommitFileSetBegin(committer);
H
Hongze Cheng 已提交
410
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412
  // commit fset
H
Hongze Cheng 已提交
413
  code = tsdbCommitTSData(committer);
H
Hongze Cheng 已提交
414 415
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
416
  code = tsdbCommitTombData(committer);
H
Hongze Cheng 已提交
417 418 419
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
420
  code = tsdbCommitFileSetEnd(committer);
H
Hongze Cheng 已提交
421
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
422 423 424

_exit:
  if (code) {
H
Hongze Cheng 已提交
425
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
426
  } else {
H
Hongze Cheng 已提交
427
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
428 429 430 431
  }
  return code;
}

H
Hongze Cheng 已提交
432
static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *committer) {
H
Hongze Cheng 已提交
433
  int32_t code = 0;
H
Hongze Cheng 已提交
434
  int32_t lino = 0;
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
  memset(committer, 0, sizeof(committer[0]));
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438
  committer->tsdb = tsdb;
H
Hongze Cheng 已提交
439 440
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
441 442 443 444 445 446
  committer->minutes = tsdb->keepCfg.days;
  committer->precision = tsdb->keepCfg.precision;
  committer->minRow = info->info.config.tsdbCfg.minRows;
  committer->maxRow = info->info.config.tsdbCfg.maxRows;
  committer->cmprAlg = info->info.config.tsdbCfg.compression;
  committer->sttTrigger = info->info.config.sttTrigger;
H
Hongze Cheng 已提交
447 448 449
  committer->szPage = info->info.config.tsdbPageSize;
  committer->compactVersion = INT64_MAX;
  committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
450
  committer->ctx->now = taosGetTimestampSec();
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452
  committer->ctx->nextKey = tsdb->imem->minKey;
H
Hongze Cheng 已提交
453 454 455
  if (tsdb->imem->nDel > 0) {
    SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)};

H
Hongze Cheng 已提交
456 457 458 459 460 461 462 463 464 465 466
    for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
      STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);

      for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
        if (delData->sKey < committer->ctx->nextKey) {
          committer->ctx->nextKey = delData->sKey;
        }
      }
    }
  }

H
Hongze Cheng 已提交
467 468
_exit:
  if (code) {
H
Hongze Cheng 已提交
469
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
470
  } else {
H
Hongze Cheng 已提交
471
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
472 473 474 475
  }
  return code;
}

H
Hongze Cheng 已提交
476
static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
H
Hongze Cheng 已提交
477
  int32_t code = 0;
H
Hongze Cheng 已提交
478
  int32_t lino = 0;
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480
  if (eno == 0) {
H
Hongze Cheng 已提交
481
    code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
482
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
483
  } else {
H
Hongze Cheng 已提交
484 485
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
486
  }
H
Hongze Cheng 已提交
487

H
Hongze Cheng 已提交
488
  ASSERT(committer->writer == NULL);
489 490 491 492
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(committer->tombIterMerger == NULL);
  TARRAY2_DESTROY(committer->dataIterArray, NULL);
  TARRAY2_DESTROY(committer->tombIterArray, NULL);
H
Hongze Cheng 已提交
493
  TARRAY2_DESTROY(committer->fopArray, NULL);
H
Hongze Cheng 已提交
494
  tsdbFSDestroyCopySnapshot(&committer->fsetArr);
H
Hongze Cheng 已提交
495

H
Hongze Cheng 已提交
496
_exit:
H
Hongze Cheng 已提交
497
  if (code) {
H
Hongze Cheng 已提交
498 499
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino,
              tstrerror(code), committer->ctx->cid);
H
Hongze Cheng 已提交
500
  } else {
H
Hongze Cheng 已提交
501
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid);
H
Hongze Cheng 已提交
502
  }
H
Hongze Cheng 已提交
503 504 505
  return code;
}

H
Hongze Cheng 已提交
506 507 508 509 510 511
int32_t tsdbPreCommit(STsdb *tsdb) {
  taosThreadRwlockWrlock(&tsdb->rwLock);
  ASSERT(tsdb->imem == NULL);
  tsdb->imem = tsdb->mem;
  tsdb->mem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
512 513 514
  return 0;
}

H
Hongze Cheng 已提交
515 516
int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
  if (!tsdb) return 0;
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518 519 520
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
521 522 523
  SMemTable *imem = tsdb->imem;
  int64_t    nRow = imem->nRow;
  int64_t    nDel = imem->nDel;
H
Hongze Cheng 已提交
524

H
Hongze Cheng 已提交
525
  if (nRow == 0 && nDel == 0) {
H
Hongze Cheng 已提交
526 527 528
    taosThreadRwlockWrlock(&tsdb->rwLock);
    tsdb->imem = NULL;
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
529
    tsdbUnrefMemTable(imem, NULL, true);
H
Hongze Cheng 已提交
530
  } else {
H
Hongze Cheng 已提交
531
    SCommitter2 committer[1];
H
Hongze Cheng 已提交
532

H
Hongze Cheng 已提交
533
    code = tsdbOpenCommitter(tsdb, info, committer);
H
Hongze Cheng 已提交
534
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536 537
    while (committer->ctx->nextKey != TSKEY_MAX) {
      code = tsdbCommitFileSet(committer);
H
Hongze Cheng 已提交
538
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
539
    }
H
Hongze Cheng 已提交
540

H
Hongze Cheng 已提交
541
    code = tsdbCloseCommitter(committer, code);
H
Hongze Cheng 已提交
542 543
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
544 545 546

_exit:
  if (code) {
H
Hongze Cheng 已提交
547
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
548
  } else {
H
Hongze Cheng 已提交
549
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(tsdb->pVnode), __func__, nRow, nDel);
H
Hongze Cheng 已提交
550 551 552 553
  }
  return code;
}

H
Hongze Cheng 已提交
554
int32_t tsdbCommitCommit(STsdb *tsdb) {
H
Hongze Cheng 已提交
555 556
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
557

H
Hongze Cheng 已提交
558
  if (tsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560 561 562
  SMemTable *pMemTable = tsdb->imem;
  taosThreadRwlockWrlock(&tsdb->rwLock);
  code = tsdbFSEditCommit(tsdb->pFS);
H
Hongze Cheng 已提交
563
  if (code) {
H
Hongze Cheng 已提交
564
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
565 566
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
567 568
  tsdb->imem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
569
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
570 571 572

_exit:
  if (code) {
H
Hongze Cheng 已提交
573
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
574
  } else {
H
Hongze Cheng 已提交
575
    tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
576 577 578 579 580 581 582
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
583 584

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
587 588 589 590
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
591
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
592
  } else {
H
Hongze Cheng 已提交
593
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
594 595 596
  }
  return code;
}