tsdbCommit2.c 18.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdbCommit2.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21
  STsdb         *tsdb;
  TFileSetArray *fsetArr;
H
Hongze Cheng 已提交
22 23
  TFileOpArray   fopArray[1];

H
Hongze Cheng 已提交
24 25
  // SSkmInfo skmTb[1];
  // SSkmInfo skmRow[1];
H
Hongze Cheng 已提交
26

H
Hongze Cheng 已提交
27 28 29 30 31
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32 33
  int32_t sttTrigger;
  int32_t szPage;
H
Hongze Cheng 已提交
34 35 36
  int64_t compactVersion;

  struct {
H
Hongze Cheng 已提交
37
    int64_t    cid;
H
Hongze Cheng 已提交
38 39
    int64_t    now;
    TSKEY      nextKey;
40
    TSKEY      maxDelKey;
H
Hongze Cheng 已提交
41 42
    int32_t    fid;
    int32_t    expLevel;
H
Hongze Cheng 已提交
43
    SDiskID    did;
H
Hongze Cheng 已提交
44 45 46 47
    TSKEY      minKey;
    TSKEY      maxKey;
    STFileSet *fset;
    TABLEID    tbid[1];
H
Hongze Cheng 已提交
48
    bool       hasTSData;
H
Hongze Cheng 已提交
49 50
  } ctx[1];

51
  // reader
H
Hongze Cheng 已提交
52
  TSttFileReaderArray sttReaderArray[1];
53 54 55 56 57 58

  // iter
  TTsdbIterArray dataIterArray[1];
  SIterMerger   *dataIterMerger;
  TTsdbIterArray tombIterArray[1];
  SIterMerger   *tombIterMerger;
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
  // writer
H
Hongze Cheng 已提交
61
  SFSetWriter *writer;
H
Hongze Cheng 已提交
62
} SCommitter2;
H
Hongze Cheng 已提交
63

64
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
65
  int32_t code = 0;
H
Hongze Cheng 已提交
66
  int32_t lino = 0;
H
Hongze Cheng 已提交
67

H
Hongze Cheng 已提交
68
  SFSetWriterConfig config = {
H
Hongze Cheng 已提交
69
      .tsdb = committer->tsdb,
H
Hongze Cheng 已提交
70 71 72
      .toSttOnly = true,
      .compactVersion = committer->compactVersion,
      .minRow = committer->minRow,
H
Hongze Cheng 已提交
73
      .maxRow = committer->maxRow,
H
Hongze Cheng 已提交
74
      .szPage = committer->szPage,
H
Hongze Cheng 已提交
75
      .cmprAlg = committer->cmprAlg,
H
Hongze Cheng 已提交
76 77
      .fid = committer->ctx->fid,
      .cid = committer->ctx->cid,
H
Hongze Cheng 已提交
78
      .did = committer->ctx->did,
H
Hongze Cheng 已提交
79
      .level = 0,
H
Hongze Cheng 已提交
80
  };
H
Hongze Cheng 已提交
81

H
Hongze Cheng 已提交
82
  if (committer->sttTrigger == 1) {
H
Hongze Cheng 已提交
83
    config.toSttOnly = false;
H
Hongze Cheng 已提交
84 85 86 87 88 89 90 91 92

    if (committer->ctx->fset) {
      for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ftype++) {
        if (committer->ctx->fset->farr[ftype] != NULL) {
          config.files[ftype].exist = true;
          config.files[ftype].file = committer->ctx->fset->farr[ftype]->f[0];
        }
      }
    }
H
Hongze Cheng 已提交
93
  }
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
  code = tsdbFSetWriterOpen(&config, &committer->writer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
98 99 100
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
101 102
  }
  return code;
H
Hongze Cheng 已提交
103 104
}

105
static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
106
  return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray);
H
Hongze Cheng 已提交
107 108 109
}

static int32_t tsdbCommitTSData(SCommitter2 *committer) {
H
Hongze Cheng 已提交
110 111
  int32_t   code = 0;
  int32_t   lino = 0;
H
Hongze Cheng 已提交
112
  int64_t   numOfRow = 0;
H
Hongze Cheng 已提交
113
  SMetaInfo info;
H
Hongze Cheng 已提交
114

H
Hongze Cheng 已提交
115 116
  committer->ctx->hasTSData = false;

117 118
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
119 120
  for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) {
    if (row->uid != committer->ctx->tbid->uid) {
H
fix bug  
Hongze Cheng 已提交
121 122 123
      committer->ctx->tbid->suid = row->suid;
      committer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
124
      if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
H
fix bug  
Hongze Cheng 已提交
125
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
126 127 128 129 130
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
131 132 133
    int64_t ts = TSDBROW_TS(&row->row);
    if (ts > committer->ctx->maxKey) {
      committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
H
fix bug  
Hongze Cheng 已提交
134
      code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
H
Hongze Cheng 已提交
135 136 137 138
      TSDB_CHECK_CODE(code, lino, _exit);
      continue;
    }

H
Hongze Cheng 已提交
139
    committer->ctx->hasTSData = true;
H
Hongze Cheng 已提交
140
    numOfRow++;
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
    code = tsdbFSetWriteRow(committer->writer, row);
H
Hongze Cheng 已提交
143
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
144 145

    code = tsdbIterMergerNext(committer->dataIterMerger);
H
Hongze Cheng 已提交
146 147 148
    TSDB_CHECK_CODE(code, lino, _exit);
  }

149 150 151
_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
152 153
  } else {
    tsdbDebug("vgId:%d fid:%d commit %" PRId64 " rows", TD_VID(committer->tsdb->pVnode), committer->ctx->fid, numOfRow);
154 155 156 157 158
  }
  return code;
}

static int32_t tsdbCommitTombData(SCommitter2 *committer) {
159 160
  int32_t   code = 0;
  int32_t   lino = 0;
H
Hongze Cheng 已提交
161
  int64_t   numRecord = 0;
162
  SMetaInfo info;
163

H
Hongze Cheng 已提交
164
  if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
165 166
    if (committer->ctx->maxKey < committer->ctx->maxDelKey) {
      committer->ctx->nextKey = committer->ctx->maxKey + 1;
167 168
    } else {
      committer->ctx->nextKey = TSKEY_MAX;
169
    }
H
Hongze Cheng 已提交
170 171 172
    return 0;
  }

173 174
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
175
  for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
176 177 178 179 180 181 182 183 184 185 186
    if (record->uid != committer->ctx->tbid->uid) {
      committer->ctx->tbid->suid = record->suid;
      committer->ctx->tbid->uid = record->uid;

      if (metaGetInfo(committer->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
      }
    }

H
Hongze Cheng 已提交
187
    if (record->ekey < committer->ctx->minKey) {
H
Hongze Cheng 已提交
188
      goto _next;
H
Hongze Cheng 已提交
189 190
    } else if (record->skey > committer->ctx->maxKey) {
      committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey);
H
Hongze Cheng 已提交
191
      goto _next;
H
Hongze Cheng 已提交
192 193
    }

194
    TSKEY maxKey = committer->ctx->maxKey;
H
Hongze Cheng 已提交
195
    if (record->ekey > committer->ctx->maxKey) {
196
      maxKey = committer->ctx->maxKey + 1;
H
Hongze Cheng 已提交
197 198
    }

199 200
    if (record->ekey > committer->ctx->maxKey && committer->ctx->nextKey > maxKey) {
      committer->ctx->nextKey = maxKey;
201 202
    }

H
Hongze Cheng 已提交
203
    record->skey = TMAX(record->skey, committer->ctx->minKey);
204
    record->ekey = TMIN(record->ekey, maxKey);
H
Hongze Cheng 已提交
205

H
Hongze Cheng 已提交
206
    numRecord++;
H
Hongze Cheng 已提交
207 208
    code = tsdbFSetWriteTombRecord(committer->writer, record);
    TSDB_CHECK_CODE(code, lino, _exit);
209

H
Hongze Cheng 已提交
210
  _next:
H
Hongze Cheng 已提交
211 212
    code = tsdbIterMergerNext(committer->tombIterMerger);
    TSDB_CHECK_CODE(code, lino, _exit);
213
  }
H
Hongze Cheng 已提交
214 215 216 217

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
218 219 220
  } else {
    tsdbDebug("vgId:%d fid:%d commit %" PRId64 " tomb records", TD_VID(committer->tsdb->pVnode), committer->ctx->fid,
              numRecord);
H
Hongze Cheng 已提交
221 222 223 224
  }
  return code;
}

225
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
H
Hongze Cheng 已提交
226 227 228
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
229
  ASSERT(TARRAY2_SIZE(committer->sttReaderArray) == 0);
H
Hongze Cheng 已提交
230

231 232 233 234 235 236
  if (committer->ctx->fset == NULL                        //
      || committer->sttTrigger > 1                        //
      || TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 0  //
  ) {
    return 0;
  }
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251
  SSttLvl *lvl;
  TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) {
    STFileObj *fobj = NULL;
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
      SSttFileReader *sttReader;

      SSttFileReaderConfig config = {
          .tsdb = committer->tsdb,
          .szPage = committer->szPage,
          .file = fobj->f[0],
      };

      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253 254
      code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
255

H
Hongze Cheng 已提交
256 257 258 259 260
      STFileOp op = {
          .optype = TSDB_FOP_REMOVE,
          .fid = fobj->f->fid,
          .of = fobj->f[0],
      };
261

H
Hongze Cheng 已提交
262 263 264
      code = TARRAY2_APPEND(committer->fopArray, op);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
265
  }
H
Hongze Cheng 已提交
266 267 268 269 270 271 272 273

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
  }
  return code;
}

H
Hongze Cheng 已提交
274 275 276 277
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) {
  TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose);
  return 0;
}
H
Hongze Cheng 已提交
278

279
static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
280 281 282
  int32_t code = 0;
  int32_t lino = 0;

283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(TARRAY2_SIZE(committer->tombIterArray) == 0);
  ASSERT(committer->tombIterMerger == NULL);

  STsdbIter      *iter;
  STsdbIterConfig config = {0};

  // mem data iter
  config.type = TSDB_ITER_TYPE_MEMT;
  config.memt = committer->tsdb->imem;
  config.from->ts = committer->ctx->minKey;
  config.from->version = VERSION_MIN;

  code = tsdbIterOpen(&config, &iter);
H
Hongze Cheng 已提交
298 299
  TSDB_CHECK_CODE(code, lino, _exit);

300 301
  code = TARRAY2_APPEND(committer->dataIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
302

303 304 305
  // mem tomb iter
  config.type = TSDB_ITER_TYPE_MEMT_TOMB;
  config.memt = committer->tsdb->imem;
H
Hongze Cheng 已提交
306

307 308 309 310 311 312 313
  code = tsdbIterOpen(&config, &iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = TARRAY2_APPEND(committer->tombIterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);

  // STT
H
Hongze Cheng 已提交
314 315
  SSttFileReader *sttReader;
  TARRAY2_FOREACH(committer->sttReaderArray, sttReader) {
316 317
    // data iter
    config.type = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
318
    config.sttReader = sttReader;
319 320 321 322 323 324 325 326 327

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->dataIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    // tomb iter
    config.type = TSDB_ITER_TYPE_STT_TOMB;
H
Hongze Cheng 已提交
328
    config.sttReader = sttReader;
329 330 331 332 333 334

    code = tsdbIterOpen(&config, &iter);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = TARRAY2_APPEND(committer->tombIterArray, iter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
335 336
  }

337 338 339 340 341
  // open merger
  code = tsdbIterMergerOpen(committer->dataIterArray, &committer->dataIterMerger, false);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbIterMergerOpen(committer->tombIterArray, &committer->tombIterMerger, true);
H
Hongze Cheng 已提交
342 343
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
344 345
_exit:
  if (code) {
H
Hongze Cheng 已提交
346
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
347 348 349 350
  }
  return code;
}

351 352 353 354 355 356 357 358
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) {
  tsdbIterMergerClose(&committer->tombIterMerger);
  tsdbIterMergerClose(&committer->dataIterMerger);
  TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
  TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
  return 0;
}

H
Hongze Cheng 已提交
359 360 361 362 363 364
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *tsdb = committer->tsdb;

  committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
H
Hongze Cheng 已提交
365
  committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
H
Hongze Cheng 已提交
366 367
  tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
                  &committer->ctx->maxKey);
H
Hongze Cheng 已提交
368 369
  code = tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, &committer->ctx->did);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
370
  tfsMkdirRecurAt(committer->tsdb->pVnode->pTfs, committer->tsdb->path, committer->ctx->did);
H
Hongze Cheng 已提交
371 372
  STFileSet fset = {.fid = committer->ctx->fid};
  committer->ctx->fset = &fset;
H
Hongze Cheng 已提交
373 374
  STFileSet **fsetPtr = TARRAY2_SEARCH(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
  committer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
H
Hongze Cheng 已提交
375 376
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
377

378 379
  ASSERT(TARRAY2_SIZE(committer->dataIterArray) == 0);
  ASSERT(committer->dataIterMerger == NULL);
H
Hongze Cheng 已提交
380
  ASSERT(committer->writer == NULL);
H
Hongze Cheng 已提交
381

382 383 384 385 386 387
  code = tsdbCommitOpenReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbCommitOpenIter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
388 389
  code = tsdbCommitOpenWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
390

H
Hongze Cheng 已提交
391 392 393
  // reset nextKey
  committer->ctx->nextKey = TSKEY_MAX;

H
Hongze Cheng 已提交
394 395
_exit:
  if (code) {
H
Hongze Cheng 已提交
396
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
397
  } else {
H
Hongze Cheng 已提交
398 399
    tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", TD_VID(tsdb->pVnode),
              __func__, committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
H
Hongze Cheng 已提交
400
  }
H
Hongze Cheng 已提交
401
  return 0;
H
Hongze Cheng 已提交
402 403
}

H
Hongze Cheng 已提交
404
static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
H
Hongze Cheng 已提交
405
  int32_t code = 0;
H
Hongze Cheng 已提交
406
  int32_t lino = 0;
H
Hongze Cheng 已提交
407

408 409
  code = tsdbCommitCloseWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
410

411
  code = tsdbCommitCloseIter(committer);
H
Hongze Cheng 已提交
412
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
413

414 415
  code = tsdbCommitCloseReader(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417 418
_exit:
  if (code) {
H
Hongze Cheng 已提交
419
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
420
  } else {
H
Hongze Cheng 已提交
421
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
422 423 424 425
  }
  return code;
}

H
Hongze Cheng 已提交
426
static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
H
Hongze Cheng 已提交
427 428 429
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
430
  // fset commit start
H
Hongze Cheng 已提交
431
  code = tsdbCommitFileSetBegin(committer);
H
Hongze Cheng 已提交
432
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
433

H
Hongze Cheng 已提交
434
  // commit fset
H
Hongze Cheng 已提交
435
  code = tsdbCommitTSData(committer);
H
Hongze Cheng 已提交
436 437
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
438
  code = tsdbCommitTombData(committer);
H
Hongze Cheng 已提交
439 440 441
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
442
  code = tsdbCommitFileSetEnd(committer);
H
Hongze Cheng 已提交
443
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
444 445 446

_exit:
  if (code) {
H
Hongze Cheng 已提交
447
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
448
  } else {
H
Hongze Cheng 已提交
449
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
450 451 452 453
  }
  return code;
}

H
Hongze Cheng 已提交
454
static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *committer) {
H
Hongze Cheng 已提交
455
  int32_t code = 0;
H
Hongze Cheng 已提交
456
  int32_t lino = 0;
H
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458
  memset(committer, 0, sizeof(committer[0]));
H
Hongze Cheng 已提交
459

H
Hongze Cheng 已提交
460
  committer->tsdb = tsdb;
H
Hongze Cheng 已提交
461 462
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
463 464 465 466 467 468
  committer->minutes = tsdb->keepCfg.days;
  committer->precision = tsdb->keepCfg.precision;
  committer->minRow = info->info.config.tsdbCfg.minRows;
  committer->maxRow = info->info.config.tsdbCfg.maxRows;
  committer->cmprAlg = info->info.config.tsdbCfg.compression;
  committer->sttTrigger = info->info.config.sttTrigger;
H
Hongze Cheng 已提交
469 470 471
  committer->szPage = info->info.config.tsdbPageSize;
  committer->compactVersion = INT64_MAX;
  committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
472
  committer->ctx->now = taosGetTimestampSec();
473

H
Hongze Cheng 已提交
474
  committer->ctx->nextKey = tsdb->imem->minKey;
H
Hongze Cheng 已提交
475 476 477
  if (tsdb->imem->nDel > 0) {
    SRBTreeIter iter[1] = {tRBTreeIterCreate(tsdb->imem->tbDataTree, 1)};

H
Hongze Cheng 已提交
478 479 480 481 482 483 484 485 486 487 488
    for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) {
      STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);

      for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
        if (delData->sKey < committer->ctx->nextKey) {
          committer->ctx->nextKey = delData->sKey;
        }
      }
    }
  }

489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  committer->ctx->maxDelKey = TSKEY_MIN;
  TSKEY minKey = TSKEY_MAX;
  TSKEY maxKey = TSKEY_MIN;
  if (TARRAY2_SIZE(committer->fsetArr) > 0) {
    STFileSet *fset = TARRAY2_LAST(committer->fsetArr);
    tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey);

    fset = TARRAY2_FIRST(committer->fsetArr);
    tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey);
  }

  if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) {
    committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey);
  }

H
Hongze Cheng 已提交
504 505
_exit:
  if (code) {
H
Hongze Cheng 已提交
506
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
507
  } else {
H
Hongze Cheng 已提交
508
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
509 510 511 512
  }
  return code;
}

H
Hongze Cheng 已提交
513
static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
H
Hongze Cheng 已提交
514
  int32_t code = 0;
H
Hongze Cheng 已提交
515
  int32_t lino = 0;
H
Hongze Cheng 已提交
516

H
Hongze Cheng 已提交
517
  if (eno == 0) {
H
Hongze Cheng 已提交
518
    code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
519
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
520
  } else {
H
Hongze Cheng 已提交
521 522
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
523
  }
H
Hongze Cheng 已提交
524

H
Hongze Cheng 已提交
525
  ASSERT(committer->writer == NULL);
526 527 528 529
  ASSERT(committer->dataIterMerger == NULL);
  ASSERT(committer->tombIterMerger == NULL);
  TARRAY2_DESTROY(committer->dataIterArray, NULL);
  TARRAY2_DESTROY(committer->tombIterArray, NULL);
530
  TARRAY2_DESTROY(committer->sttReaderArray, NULL);
H
Hongze Cheng 已提交
531
  TARRAY2_DESTROY(committer->fopArray, NULL);
H
Hongze Cheng 已提交
532
  TARRAY2_DESTROY(committer->sttReaderArray, NULL);
H
Hongze Cheng 已提交
533
  tsdbFSDestroyCopySnapshot(&committer->fsetArr);
H
Hongze Cheng 已提交
534

H
Hongze Cheng 已提交
535
_exit:
H
Hongze Cheng 已提交
536
  if (code) {
H
Hongze Cheng 已提交
537 538
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, lino,
              tstrerror(code), committer->ctx->cid);
H
Hongze Cheng 已提交
539
  } else {
H
Hongze Cheng 已提交
540
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->cid);
H
Hongze Cheng 已提交
541
  }
H
Hongze Cheng 已提交
542 543 544
  return code;
}

H
Hongze Cheng 已提交
545 546 547 548 549 550
int32_t tsdbPreCommit(STsdb *tsdb) {
  taosThreadRwlockWrlock(&tsdb->rwLock);
  ASSERT(tsdb->imem == NULL);
  tsdb->imem = tsdb->mem;
  tsdb->mem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
551 552 553
  return 0;
}

H
Hongze Cheng 已提交
554 555
int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
  if (!tsdb) return 0;
H
Hongze Cheng 已提交
556

H
Hongze Cheng 已提交
557 558 559
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
560 561 562
  SMemTable *imem = tsdb->imem;
  int64_t    nRow = imem->nRow;
  int64_t    nDel = imem->nDel;
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564
  if (nRow == 0 && nDel == 0) {
H
Hongze Cheng 已提交
565 566 567
    taosThreadRwlockWrlock(&tsdb->rwLock);
    tsdb->imem = NULL;
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
568
    tsdbUnrefMemTable(imem, NULL, true);
H
Hongze Cheng 已提交
569
  } else {
H
Hongze Cheng 已提交
570
    SCommitter2 committer[1];
H
Hongze Cheng 已提交
571

H
Hongze Cheng 已提交
572
    code = tsdbOpenCommitter(tsdb, info, committer);
H
Hongze Cheng 已提交
573
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
574

H
Hongze Cheng 已提交
575 576
    while (committer->ctx->nextKey != TSKEY_MAX) {
      code = tsdbCommitFileSet(committer);
H
Hongze Cheng 已提交
577
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
578
    }
H
Hongze Cheng 已提交
579

H
Hongze Cheng 已提交
580
    code = tsdbCloseCommitter(committer, code);
H
Hongze Cheng 已提交
581 582
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
583 584 585

_exit:
  if (code) {
H
Hongze Cheng 已提交
586
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
587
  } else {
H
Hongze Cheng 已提交
588
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(tsdb->pVnode), __func__, nRow, nDel);
H
Hongze Cheng 已提交
589 590 591 592
  }
  return code;
}

H
Hongze Cheng 已提交
593
int32_t tsdbCommitCommit(STsdb *tsdb) {
H
Hongze Cheng 已提交
594 595
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597
  if (tsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
598

H
Hongze Cheng 已提交
599 600 601
  SMemTable *pMemTable = tsdb->imem;
  taosThreadRwlockWrlock(&tsdb->rwLock);
  code = tsdbFSEditCommit(tsdb->pFS);
H
Hongze Cheng 已提交
602
  if (code) {
H
Hongze Cheng 已提交
603
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
604 605
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
606 607
  tsdb->imem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
608
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
609 610 611

_exit:
  if (code) {
H
Hongze Cheng 已提交
612
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
613
  } else {
H
Hongze Cheng 已提交
614
    tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
615 616 617 618 619 620 621
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
622 623

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
624

H
Hongze Cheng 已提交
625
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
626 627 628 629
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
630
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
631
  } else {
H
Hongze Cheng 已提交
632
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
633 634
  }
  return code;
635
}