tsdbCommit2.c 18.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbCommit2.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
22 23
  TFileOpArray   fopArray[1];

H
Hongze Cheng 已提交
24 25
  // SSkmInfo skmTb[1];
  // SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28 29 30 31
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32 33
  int32_t sttTrigger;
  int32_t szPage;
H
Hongze Cheng 已提交
34 35 36
  int64_t compactVersion;

  struct {
H
Hongze Cheng 已提交
37
    int64_t    cid;
H
Hongze Cheng 已提交
38 39
    int64_t    now;
    TSKEY      nextKey;
40
    TSKEY      maxDelKey;
H
Hongze Cheng 已提交
41 42
    int32_t    fid;
    int32_t    expLevel;
H
Hongze Cheng 已提交
43
    SDiskID    did;
H
Hongze Cheng 已提交
44 45 46 47
    TSKEY      minKey;
    TSKEY      maxKey;
    STFileSet *fset;
    TABLEID    tbid[1];
H
Hongze Cheng 已提交
48
    bool       hasTSData;
H
Hongze Cheng 已提交
49 50
  } ctx[1];

51
  // reader
H
Hongze Cheng 已提交
52
  TSttFileReaderArray sttReaderArray[1];
53 54 55 56 57 58

  // iter
  TTsdbIterArray dataIterArray[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArray[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
  // writer
H
Hongze Cheng 已提交
61
  SFSetWriter *writer;
H
Hongze Cheng 已提交
62
} SCommitter2;
H
Hongze Cheng 已提交
63

64
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
65
  int32_t code = 0;
H
Hongze Cheng 已提交
66
  int32_t lino = 0;
H
Hongze Cheng 已提交
67

H
Hongze Cheng 已提交
68
  SFSetWriterConfig config = {
H
Hongze Cheng 已提交
69
      .tsdb = committer->tsdb,
H
Hongze Cheng 已提交
70 71 72
      .toSttOnly = true,
      .compactVersion = committer->compactVersion,
      .minRow = committer->minRow,
H
Hongze Cheng 已提交
73
      .maxRow = committer->maxRow,
H
Hongze Cheng 已提交
74
      .szPage = committer->szPage,
H
Hongze Cheng 已提交
75
      .cmprAlg = committer->cmprAlg,
H
Hongze Cheng 已提交
76 77
      .fid = committer->ctx->fid,
      .cid = committer->ctx->cid,
H
Hongze Cheng 已提交
78
      .did = committer->ctx->did,
H
Hongze Cheng 已提交
79
      .level = 0,
H
Hongze Cheng 已提交
80
  };
H
Hongze Cheng 已提交
81

H
Hongze Cheng 已提交
82
  if (committer->sttTrigger == 1) {
H
Hongze Cheng 已提交
83
    config.toSttOnly = false;
H
Hongze Cheng 已提交
84 85 86 87 88 89 90 91 92

    if (committer->ctx->fset) {
      for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
        if (committer->ctx->fset->farr[ftype] != NULL) {
          config.files[ftype].exist = true;
          config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
        }
      }
    }
H
Hongze Cheng 已提交
93
  }
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
  code = tsdbFSetWriterOpen(&config, &committer->writer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
98 99 100
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
101 102
  }
  return code;
H
Hongze Cheng 已提交
103 104
}

105
static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
106
  return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray);
H
Hongze Cheng 已提交
107 108 109
}

static int32_t tsdbCommitTSData(SCommitter2 *committer) {
H
Hongze Cheng 已提交
110 111
  int32_t   code = 0;
  int32_t   lino = 0;
H
Hongze Cheng 已提交
112
  int64_t   numOfRow = 0;
H
Hongze Cheng 已提交
113
  SMetaInfo info;
H
Hongze Cheng 已提交
114

H
Hongze Cheng 已提交
115 116
  committer->ctx->hasTSData = false;

117 118
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
119 120
  for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
    if (row->uid != committer->ctx->tbid->uid) {
H
fix bug  
Hongze Cheng 已提交
121 122 123
      committer->ctx->tbid->suid = row->suid;
      committer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
124
      if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
H
fix bug  
Hongze Cheng 已提交
125
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
126 127 128 129 130
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
131 132 133
    int64_t ts = TSDBROW_TS(&row->row);
    if (ts > committer->ctx->maxKey) {
      committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
H
fix bug  
Hongze Cheng 已提交
134
      code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
135 136 137 138
      TSDB_CHECK_CODE(code, lino, _exit);
      continue;
    }

H
Hongze Cheng 已提交
139
    committer->ctx->hasTSData = true;
H
Hongze Cheng 已提交
140
    numOfRow++;
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
    code = tsdbFSetWriteRow(committer->writer, row);
H
Hongze Cheng 已提交
143
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
144 145

    code = tsdbIterMergerNext(committer->dataIterMerger);
H
Hongze Cheng 已提交
146 147 148
    TSDB_CHECK_CODE(code, lino, _exit);
  }

149 150 151
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
152 153
  } else {
    tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, numOfRow);
154 155 156 157 158
  }
  return code;
}

static int32_t tsdbCommitTombData(SCommitter2 *committer) {
159 160
  int32_t   code = 0;
  int32_t   lino = 0;
H
Hongze Cheng 已提交
161
  int64_t   numRecord = 0;
162
  SMetaInfo info;
163

H
Hongze Cheng 已提交
164
  if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
165 166
    if (committer->ctx->maxKey < committer->ctx->maxDelKey) {
      committer->ctx->nextKey = committer->ctx->maxKey + 1;
167 168
    } else {
      committer->ctx->nextKey = TSKEY_MAX;
169
    }
H
Hongze Cheng 已提交
170 171 172
    return 0;
  }

173 174
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
175
  for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
176 177 178 179 180 181 182 183 184 185 186
    if (record->uid != committer->ctx->tbid->uid) {
      committer->ctx->tbid->suid = record->suid;
      committer->ctx->tbid->uid = record->uid;

      if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
187
    if (record->ekey < committer->ctx->minKey) {
H
Hongze Cheng 已提交
188
      goto _next;
H
Hongze Cheng 已提交
189 190
    } else if (record->skey > committer->ctx->maxKey) {
      committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey);
H
Hongze Cheng 已提交
191
      goto _next;
H
Hongze Cheng 已提交
192 193
    }

194
    TSKEY maxKey = committer->ctx->maxKey;
H
Hongze Cheng 已提交
195
    if (record->ekey > committer->ctx->maxKey) {
196
      maxKey = committer->ctx->maxKey + 1;
H
Hongze Cheng 已提交
197 198
    }

199 200
    if (record->ekey > committer->ctx->maxKey && committer->ctx->nextKey > maxKey) {
      committer->ctx->nextKey = maxKey;
201 202
    }

H
Hongze Cheng 已提交
203
    record->skey = TMAX(record->skey, committer->ctx->minKey);
204
    record->ekey = TMIN(record->ekey, maxKey);
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
    numRecord++;
H
Hongze Cheng 已提交
207 208
    code = tsdbFSetWriteTombRecord(committer->writer, record);
    TSDB_CHECK_CODE(code, lino, _exit);
209

H
Hongze Cheng 已提交
210
  _next:
H
Hongze Cheng 已提交
211 212
    code = tsdbIterMergerNext(committer->tombIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
213
  }
H
Hongze Cheng 已提交
214 215 216 217

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
218 219 220
  } else {
    tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), committer->ctx->fid,
              numRecord);
H
Hongze Cheng 已提交
221 222 223 224
  }
  return code;
}

225
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
H
Hongze Cheng 已提交
226 227 228
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
229
  ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0);
H
Hongze Cheng 已提交
230

231 232 233 234 235 236
  if (committer->ctx->fset == NULL                        //
      || committer->sttTrigger > 1                        //
      || TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0  //
  ) {
    return 0;
  }
H
Hongze Cheng 已提交
237

238
  ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1);
H
Hongze Cheng 已提交
239

240
  SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr);
H
Hongze Cheng 已提交
241

242
  ASSERT(lvl->level == 0);
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244 245 246
  STFileObj *fobj = NULL;
  TARRAY2_FOREACH(lvl->fobjArr, fobj) {
    SSttFileReader *sttReader;
H
Hongze Cheng 已提交
247

H
Hongze Cheng 已提交
248 249 250 251 252
    SSttFileReaderConfig config = {
        .tsdb = committer->tsdb,
        .szPage = committer->szPage,
        .file = fobj->f[0],
    };
H
Hongze Cheng 已提交
253

H
Hongze Cheng 已提交
254 255
    code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
256

H
Hongze Cheng 已提交
257 258
    code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260 261 262 263 264
    STFileOp op = {
        .optype = TSDB_FOP_REMOVE,
        .fid = fobj->f->fid,
        .of = fobj->f[0],
    };
265

H
Hongze Cheng 已提交
266 267 268
    code = TARRAY2_APPEND(committer->fopArray, op);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
269 270 271 272 273 274 275 276

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
  }
  return code;
}

H
Hongze Cheng 已提交
277 278 279 280
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) {
  TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose);
  return 0;
}
H
Hongze Cheng 已提交
281

282
static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
283 284 285
  int32_t code = 0;
  int32_t lino = 0;

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(committer->tombIterArray) == 0);
  ASSERT(committer->tombIterMerger == NULL);

  STsdbIter      *iter;
  STsdbIterConfig config = {0};

  // mem data iter
  config.type = TSDB_ITER_TYPE_MEMT;
  config.memt = committer->tsdb->imem;
  config.from->ts = committer->ctx->minKey;
  config.from->version = VERSION_MIN;

  code = tsdbIterOpen(&config, &iter);
H
Hongze Cheng 已提交
301 302
  TSDB_CHECK_CODE(code, lino, _exit);

303 304
  code = TARRAY2_APPEND(committer->dataIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
305

306 307 308
  // mem tomb iter
  config.type = TSDB_ITER_TYPE_MEMT_TOMB;
  config.memt = committer->tsdb->imem;
H
Hongze Cheng 已提交
309

310 311 312 313 314 315 316
  code = tsdbIterOpen(&config, &iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = TARRAY2_APPEND(committer->tombIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  // STT
H
Hongze Cheng 已提交
317 318
  SSttFileReader *sttReader;
  TARRAY2_FOREACH(committer->sttReaderArray, sttReader) {
319 320
    // data iter
    config.type = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
321
    config.sttReader = sttReader;
322 323 324 325 326 327 328 329 330

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->dataIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    // tomb iter
    config.type = TSDB_ITER_TYPE_STT_TOMB;
H
Hongze Cheng 已提交
331
    config.sttReader = sttReader;
332 333 334 335 336 337

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->tombIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
338 339
  }

340 341 342 343 344
  // open merger
  code = tsdbIterMergerOpen(committer->dataIterArray, &committer->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(committer->tombIterArray, &committer->tombIterMerger, true);
H
Hongze Cheng 已提交
345 346
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
347 348
_exit:
  if (code) {
H
Hongze Cheng 已提交
349
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
350 351 352 353
  }
  return code;
}

354 355 356 357 358 359 360 361
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
  tsdbIterMergerClose(&committer->tombIterMerger);
  tsdbIterMergerClose(&committer->dataIterMerger);
  TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
  TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
  return 0;
}

H
Hongze Cheng 已提交
362 363 364 365 366 367
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *tsdb = committer->tsdb;

  committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
H
Hongze Cheng 已提交
368
  committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
H
Hongze Cheng 已提交
369 370
  tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
                  &committer->ctx->maxKey);
H
Hongze Cheng 已提交
371 372
  code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
373
  tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
H
Hongze Cheng 已提交
374 375
  STFileSet fset = {.fid = committer->ctx->fid};
  committer->ctx->fset = &fset;
H
Hongze Cheng 已提交
376 377
  STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
  committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
H
Hongze Cheng 已提交
378 379
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
380

381 382
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
H
Hongze Cheng 已提交
383
  ASSERT(committer->writer == NULL);
H
Hongze Cheng 已提交
384

385 386 387 388 389 390
  code = tsdbCommitOpenReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbCommitOpenIter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
391 392
  code = tsdbCommitOpenWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
393

H
Hongze Cheng 已提交
394 395 396
  // reset nextKey
  committer->ctx->nextKey = TSKEY_MAX;

H
Hongze Cheng 已提交
397 398
_exit:
  if (code) {
H
Hongze Cheng 已提交
399
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
400
  } else {
H
Hongze Cheng 已提交
401 402
    tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
              __func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
H
Hongze Cheng 已提交
403
  }
H
Hongze Cheng 已提交
404
  return 0;
H
Hongze Cheng 已提交
405 406
}

H
Hongze Cheng 已提交
407
static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
H
Hongze Cheng 已提交
408
  int32_t code = 0;
H
Hongze Cheng 已提交
409
  int32_t lino = 0;
H
Hongze Cheng 已提交
410

411 412
  code = tsdbCommitCloseWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
413

414
  code = tsdbCommitCloseIter(committer);
H
Hongze Cheng 已提交
415
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
416

417 418
  code = tsdbCommitCloseReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420 421
_exit:
  if (code) {
H
Hongze Cheng 已提交
422
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
423
  } else {
H
Hongze Cheng 已提交
424
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
425 426 427 428
  }
  return code;
}

H
Hongze Cheng 已提交
429
static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
H
Hongze Cheng 已提交
430 431 432
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
433
  // fset commit start
H
Hongze Cheng 已提交
434
  code = tsdbCommitFileSetBegin(committer);
H
Hongze Cheng 已提交
435
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
436

H
Hongze Cheng 已提交
437
  // commit fset
H
Hongze Cheng 已提交
438
  code = tsdbCommitTSData(committer);
H
Hongze Cheng 已提交
439 440
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
441
  code = tsdbCommitTombData(committer);
H
Hongze Cheng 已提交
442 443 444
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
445
  code = tsdbCommitFileSetEnd(committer);
H
Hongze Cheng 已提交
446
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
447 448 449

_exit:
  if (code) {
H
Hongze Cheng 已提交
450
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
451
  } else {
H
Hongze Cheng 已提交
452
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
453 454 455 456
  }
  return code;
}

H
Hongze Cheng 已提交
457
static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *committer) {
H
Hongze Cheng 已提交
458
  int32_t code = 0;
H
Hongze Cheng 已提交
459
  int32_t lino = 0;
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
  memset(committer, 0, sizeof(committer[0]));
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463
  committer->tsdb = tsdb;
H
Hongze Cheng 已提交
464 465
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
466 467 468 469 470 471
  committer->minutes = tsdb->keepCfg.days;
  committer->precision = tsdb->keepCfg.precision;
  committer->minRow = info->info.config.tsdbCfg.minRows;
  committer->maxRow = info->info.config.tsdbCfg.maxRows;
  committer->cmprAlg = info->info.config.tsdbCfg.compression;
  committer->sttTrigger = info->info.config.sttTrigger;
H
Hongze Cheng 已提交
472 473 474
  committer->szPage = info->info.config.tsdbPageSize;
  committer->compactVersion = INT64_MAX;
  committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
475
  committer->ctx->now = taosGetTimestampSec();
476

H
Hongze Cheng 已提交
477
  committer->ctx->nextKey = tsdb->imem->minKey;
H
Hongze Cheng 已提交
478 479 480
  if (tsdb->imem->nDel > 0) {
    SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)};

H
Hongze Cheng 已提交
481 482 483 484 485 486 487 488 489 490 491
    for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
      STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);

      for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
        if (delData->sKey < committer->ctx->nextKey) {
          committer->ctx->nextKey = delData->sKey;
        }
      }
    }
  }

492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
  committer->ctx->maxDelKey = TSKEY_MIN;
  TSKEY minKey = TSKEY_MAX;
  TSKEY maxKey = TSKEY_MIN;
  if (TARRAY2_SIZE(committer->fsetArr) > 0) {
    STFileSet *fset = TARRAY2_LAST(committer->fsetArr);
    tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey);

    fset = TARRAY2_FIRST(committer->fsetArr);
    tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey);
  }

  if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) {
    committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey);
  }

H
Hongze Cheng 已提交
507 508
_exit:
  if (code) {
H
Hongze Cheng 已提交
509
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
510
  } else {
H
Hongze Cheng 已提交
511
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
512 513 514 515
  }
  return code;
}

H
Hongze Cheng 已提交
516
static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
H
Hongze Cheng 已提交
517
  int32_t code = 0;
H
Hongze Cheng 已提交
518
  int32_t lino = 0;
H
Hongze Cheng 已提交
519

H
Hongze Cheng 已提交
520
  if (eno == 0) {
H
Hongze Cheng 已提交
521
    code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
522
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
523
  } else {
H
Hongze Cheng 已提交
524 525
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
526
  }
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528
  ASSERT(committer->writer == NULL);
529 530 531 532
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(committer->tombIterMerger == NULL);
  TARRAY2_DESTROY(committer->dataIterArray, NULL);
  TARRAY2_DESTROY(committer->tombIterArray, NULL);
533
  TARRAY2_DESTROY(committer->sttReaderArray, NULL);
H
Hongze Cheng 已提交
534
  TARRAY2_DESTROY(committer->fopArray, NULL);
H
Hongze Cheng 已提交
535
  TARRAY2_DESTROY(committer->sttReaderArray, NULL);
H
Hongze Cheng 已提交
536
  tsdbFSDestroyCopySnapshot(&committer->fsetArr);
H
Hongze Cheng 已提交
537

H
Hongze Cheng 已提交
538
_exit:
H
Hongze Cheng 已提交
539
  if (code) {
H
Hongze Cheng 已提交
540 541
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino,
              tstrerror(code), committer->ctx->cid);
H
Hongze Cheng 已提交
542
  } else {
H
Hongze Cheng 已提交
543
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid);
H
Hongze Cheng 已提交
544
  }
H
Hongze Cheng 已提交
545 546 547
  return code;
}

H
Hongze Cheng 已提交
548 549 550 551 552 553
int32_t tsdbPreCommit(STsdb *tsdb) {
  taosThreadRwlockWrlock(&tsdb->rwLock);
  ASSERT(tsdb->imem == NULL);
  tsdb->imem = tsdb->mem;
  tsdb->mem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
554 555 556
  return 0;
}

H
Hongze Cheng 已提交
557 558
int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
  if (!tsdb) return 0;
H
Hongze Cheng 已提交
559

H
Hongze Cheng 已提交
560 561 562
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
563 564 565
  SMemTable *imem = tsdb->imem;
  int64_t    nRow = imem->nRow;
  int64_t    nDel = imem->nDel;
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567
  if (nRow == 0 && nDel == 0) {
H
Hongze Cheng 已提交
568 569 570
    taosThreadRwlockWrlock(&tsdb->rwLock);
    tsdb->imem = NULL;
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
571
    tsdbUnrefMemTable(imem, NULL, true);
H
Hongze Cheng 已提交
572
  } else {
H
Hongze Cheng 已提交
573
    SCommitter2 committer[1];
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575
    code = tsdbOpenCommitter(tsdb, info, committer);
H
Hongze Cheng 已提交
576
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579
    while (committer->ctx->nextKey != TSKEY_MAX) {
      code = tsdbCommitFileSet(committer);
H
Hongze Cheng 已提交
580
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
581
    }
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583
    code = tsdbCloseCommitter(committer, code);
H
Hongze Cheng 已提交
584 585
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
586 587 588

_exit:
  if (code) {
H
Hongze Cheng 已提交
589
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
590
  } else {
H
Hongze Cheng 已提交
591
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(tsdb->pVnode), __func__, nRow, nDel);
H
Hongze Cheng 已提交
592 593 594 595
  }
  return code;
}

H
Hongze Cheng 已提交
596
int32_t tsdbCommitCommit(STsdb *tsdb) {
H
Hongze Cheng 已提交
597 598
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600
  if (tsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
601

H
Hongze Cheng 已提交
602 603 604
  SMemTable *pMemTable = tsdb->imem;
  taosThreadRwlockWrlock(&tsdb->rwLock);
  code = tsdbFSEditCommit(tsdb->pFS);
H
Hongze Cheng 已提交
605
  if (code) {
H
Hongze Cheng 已提交
606
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
607 608
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
609 610
  tsdb->imem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
611
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
612 613 614

_exit:
  if (code) {
H
Hongze Cheng 已提交
615
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
616
  } else {
H
Hongze Cheng 已提交
617
    tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
618 619 620 621 622 623 624
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
625 626

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
627

H
Hongze Cheng 已提交
628
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
629 630 631 632
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
633
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
634
  } else {
H
Hongze Cheng 已提交
635
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
636 637
  }
  return code;
638
}