tsdbCommit.c 14.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbCommit.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20 21 22
  STsdb         *tsdb;
  TFileSetArray *fsetArr;

H
Hongze Cheng 已提交
23 24 25 26 27
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
28 29
  int32_t sttTrigger;
  int32_t szPage;
H
Hongze Cheng 已提交
30 31 32
  int64_t compactVersion;

  struct {
H
Hongze Cheng 已提交
33
    int64_t    cid;
H
Hongze Cheng 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46
    int64_t    now;
    TSKEY      nextKey;
    int32_t    fid;
    int32_t    expLevel;
    TSKEY      minKey;
    TSKEY      maxKey;
    STFileSet *fset;
    TABLEID    tbid[1];
  } ctx[1];

  TFileOpArray   fopArray[1];
  TTsdbIterArray iterArray[1];
  SIterMerger   *iterMerger;
H
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48
  // writer
H
Hongze Cheng 已提交
49
  SSttFileWriter  *sttWriter;
H
Hongze Cheng 已提交
50
  SDataFileWriter *dataWriter;
H
Hongze Cheng 已提交
51
} SCommitter2;
H
Hongze Cheng 已提交
52

H
Hongze Cheng 已提交
53
static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
54
  int32_t code = 0;
H
Hongze Cheng 已提交
55
  int32_t lino = 0;
H
Hongze Cheng 已提交
56

H
Hongze Cheng 已提交
57 58
  SDiskID did[1];
  if (tfsAllocDisk(committer->tsdb->pVnode->pTfs, committer->ctx->expLevel, did) < 0) {
H
Hongze Cheng 已提交
59 60 61 62
    code = TSDB_CODE_FS_NO_VALID_DISK;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
  SSttFileWriterConfig config[1] = {{
      .tsdb = committer->tsdb,
      .maxRow = committer->maxRow,
      .szPage = committer->tsdb->pVnode->config.tsdbPageSize,
      .cmprAlg = committer->cmprAlg,
      .compactVersion = committer->compactVersion,
      .file =
          {
              .type = TSDB_FTYPE_STT,
              .did = did[0],
              .fid = committer->ctx->fid,
              .cid = committer->ctx->cid,
          },
  }};

  code = tsdbSttFileWriterOpen(config, &committer->sttWriter);
H
Hongze Cheng 已提交
79 80 81 82
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
83
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
84
  } else {
H
Hongze Cheng 已提交
85
    tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
86 87 88
  }
  return code;
}
H
Hongze Cheng 已提交
89 90

static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *committer, const STFile *f) {
H
Hongze Cheng 已提交
91 92
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
93 94 95 96 97 98 99 100 101 102 103

  SSttFileWriterConfig config[1] = {{
      .tsdb = committer->tsdb,
      .maxRow = committer->maxRow,
      .szPage = committer->szPage,
      .cmprAlg = committer->cmprAlg,
      .compactVersion = committer->compactVersion,
      .file = f[0],
  }};

  code = tsdbSttFileWriterOpen(config, &committer->sttWriter);
H
Hongze Cheng 已提交
104 105 106 107
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
108
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
109
  } else {
H
Hongze Cheng 已提交
110
    tsdbDebug("vgId:%d %s success", TD_VID(committer->tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
111
  }
H
Hongze Cheng 已提交
112 113
  return code;
}
H
Hongze Cheng 已提交
114

H
Hongze Cheng 已提交
115
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
H
Hongze Cheng 已提交
116 117 118 119
  int32_t code = 0;
  int32_t lino = 0;

  // stt writer
H
Hongze Cheng 已提交
120
  if (!committer->ctx->fset) {
H
Hongze Cheng 已提交
121 122
    code = tsdbCommitOpenNewSttWriter(committer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
123 124
  }

H
Hongze Cheng 已提交
125 126 127 128
  const SSttLvl *lvl0 = tsdbTFileSetGetSttLvl(committer->ctx->fset, 0);
  if (lvl0 == NULL || TARRAY2_SIZE(lvl0->fobjArr) == 0) {
    code = tsdbCommitOpenNewSttWriter(committer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
129 130
  }

H
Hongze Cheng 已提交
131
  STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr);
H
Hongze Cheng 已提交
132
  if (fobj->f->stt->nseg >= committer->sttTrigger) {
H
Hongze Cheng 已提交
133 134
    code = tsdbCommitOpenNewSttWriter(committer);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
135
  } else {
H
Hongze Cheng 已提交
136 137
    code = tsdbCommitOpenExistSttWriter(committer, fobj->f);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
138
  }
H
Hongze Cheng 已提交
139

H
Hongze Cheng 已提交
140 141 142 143 144 145 146 147 148 149
  // data writer
  if (0) {
    // TODO
  }

_exit:
  if (code) {
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
  }
  return code;
H
Hongze Cheng 已提交
150 151
}

H
Hongze Cheng 已提交
152
static int32_t tsdbCommitWriteDelData(SCommitter2 *committer, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
H
Hongze Cheng 已提交
153 154 155 156 157 158
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
static int32_t tsdbCommitTSData(SCommitter2 *committer) {
  int32_t   code = 0;
  int32_t   lino = 0;
  int64_t   nRow = 0;
  int32_t   vid = TD_VID(committer->tsdb->pVnode);
  SRowInfo *row;

  if (committer->tsdb->imem->nRow == 0) goto _exit;

  // open iter and iter merger
  STsdbIter      *iter;
  STsdbIterConfig config[1] = {{
      .type = TSDB_ITER_TYPE_MEMT,
      .memt = committer->tsdb->imem,
      .from = {{
          .ts = committer->ctx->minKey,
          .version = VERSION_MIN,
      }},
  }};

  code = tsdbIterOpen(config, &iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
181

H
Hongze Cheng 已提交
182 183
  code = TARRAY2_APPEND(committer->iterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185
  code = tsdbIterMergerOpen(committer->iterArray, &committer->iterMerger);
H
Hongze Cheng 已提交
186
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
187

H
Hongze Cheng 已提交
188 189 190 191 192 193
  // loop iter
  while ((row = tsdbIterMergerGet(committer->iterMerger)) != NULL) {
    if (row->uid != committer->ctx->tbid->uid) {
      committer->ctx->tbid->suid = row->suid;
      committer->ctx->tbid->uid = row->uid;

H
Hongze Cheng 已提交
194
      // Ignore table of obsolescence
H
Hongze Cheng 已提交
195 196 197 198 199
      SMetaInfo info[1];
      if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
H
Hongze Cheng 已提交
200
      }
H
Hongze Cheng 已提交
201
    }
H
Hongze Cheng 已提交
202

H
Hongze Cheng 已提交
203 204 205 206 207 208
    TSKEY ts = TSDBROW_TS(&row->row);
    if (ts > committer->ctx->maxKey) {
      committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
      code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
H
Hongze Cheng 已提交
209
      code = tsdbSttFileWriteTSData(committer->sttWriter, row);
H
Hongze Cheng 已提交
210
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212 213
      code = tsdbIterMergerNext(committer->iterMerger);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
214 215 216 217 218
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
219
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
220
  } else {
H
Hongze Cheng 已提交
221
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, committer->ctx->fid, nRow);
H
Hongze Cheng 已提交
222 223 224 225
  }
  return code;
}

H
Hongze Cheng 已提交
226
static int32_t tsdbCommitDelData(SCommitter2 *committer) {
H
Hongze Cheng 已提交
227 228 229
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
230 231
  return 0;

H
Hongze Cheng 已提交
232
#if 0
H
Hongze Cheng 已提交
233
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
234

H
Hongze Cheng 已提交
235
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
236
  SMemTable *pMem = committer->tsdb->imem;
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238 239 240
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242 243
  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(committer->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(committer->aTbDataP, iTbData);
H
Hongze Cheng 已提交
244 245

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
246 247 248
      if (pDelData->eKey < committer->ctx->minKey) continue;
      if (pDelData->sKey > committer->ctx->maxKey) {
        committer->ctx->nextKey = TMIN(committer->ctx->nextKey, pDelData->sKey);
H
Hongze Cheng 已提交
249 250
        continue;
      }
H
Hongze Cheng 已提交
251

H
Hongze Cheng 已提交
252
      code = tsdbCommitWriteDelData(committer, pTbData->suid, pTbData->uid, pDelData->version,
H
Hongze Cheng 已提交
253
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
254 255 256 257 258 259
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
260
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(committer->tsdb->pVnode), lino, tstrerror(code));
H
Hongze Cheng 已提交
261
  } else {
H
Hongze Cheng 已提交
262
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid,
H
Hongze Cheng 已提交
263 264 265
              pMem->nDel);
  }
  return code;
H
Hongze Cheng 已提交
266
#endif
H
Hongze Cheng 已提交
267 268
}

H
Hongze Cheng 已提交
269 270 271 272 273 274 275
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *tsdb = committer->tsdb;
  int32_t vid = TD_VID(tsdb->pVnode);

  committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
H
Hongze Cheng 已提交
276
  committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
H
Hongze Cheng 已提交
277 278
  tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
                  &committer->ctx->maxKey);
H
Hongze Cheng 已提交
279 280 281 282 283
  STFileSet fset = {.fid = committer->ctx->fid};
  committer->ctx->fset = &fset;
  committer->ctx->fset = TARRAY2_SEARCH_EX(committer->fsetArr, &committer->ctx->fset, tsdbTFileSetCmprFn, TD_EQ);
  committer->ctx->tbid->suid = 0;
  committer->ctx->tbid->uid = 0;
H
Hongze Cheng 已提交
284

H
Hongze Cheng 已提交
285 286 287 288
  ASSERT(TARRAY2_SIZE(committer->iterArray) == 0);
  ASSERT(committer->iterMerger == NULL);
  ASSERT(committer->sttWriter == NULL);
  ASSERT(committer->dataWriter == NULL);
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
  code = tsdbCommitOpenWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294 295
  // reset nextKey
  committer->ctx->nextKey = TSKEY_MAX;

H
Hongze Cheng 已提交
296 297 298 299 300 301 302
_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__,
              committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
  }
H
Hongze Cheng 已提交
303
  return 0;
H
Hongze Cheng 已提交
304 305
}

H
Hongze Cheng 已提交
306
static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
H
Hongze Cheng 已提交
307
  int32_t code = 0;
H
Hongze Cheng 已提交
308
  int32_t lino = 0;
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310 311
  STFileOp op[1];
  code = tsdbSttFileWriterClose(&committer->sttWriter, 0, op);
H
Hongze Cheng 已提交
312
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314 315
  if (op->optype != TSDB_FOP_NONE) {
    code = TARRAY2_APPEND_PTR(committer->fopArray, op);
H
Hongze Cheng 已提交
316 317
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319 320 321
  tsdbIterMergerClose(&committer->iterMerger);
  TARRAY2_CLEAR(committer->iterArray, tsdbIterClose);

H
Hongze Cheng 已提交
322 323
_exit:
  if (code) {
H
Hongze Cheng 已提交
324
    TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
325
  } else {
H
Hongze Cheng 已提交
326
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
327 328 329 330
  }
  return code;
}

H
Hongze Cheng 已提交
331
static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
H
Hongze Cheng 已提交
332 333
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
334
  int32_t vid = TD_VID(committer->tsdb->pVnode);
H
Hongze Cheng 已提交
335

H
Hongze Cheng 已提交
336
  // fset commit start
H
Hongze Cheng 已提交
337
  code = tsdbCommitFileSetBegin(committer);
H
Hongze Cheng 已提交
338
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
339

H
Hongze Cheng 已提交
340
  // commit fset
H
Hongze Cheng 已提交
341
  code = tsdbCommitTSData(committer);
H
Hongze Cheng 已提交
342 343
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
344
  code = tsdbCommitDelData(committer);
H
Hongze Cheng 已提交
345 346 347
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
348
  code = tsdbCommitFileSetEnd(committer);
H
Hongze Cheng 已提交
349
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
350 351 352

_exit:
  if (code) {
H
Hongze Cheng 已提交
353
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
354
  } else {
H
Hongze Cheng 已提交
355
    tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
356 357 358 359
  }
  return code;
}

H
Hongze Cheng 已提交
360
static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *committer) {
H
Hongze Cheng 已提交
361
  int32_t code = 0;
H
Hongze Cheng 已提交
362 363
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);
H
Hongze Cheng 已提交
364

H
Hongze Cheng 已提交
365 366 367
  memset(committer, 0, sizeof(committer[0]));

  committer->tsdb = tsdb;
H
Hongze Cheng 已提交
368 369 370
  code = tsdbFSCreateCopySnapshot(tsdb->pFS, &committer->fsetArr);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
371 372 373 374 375 376
  committer->minutes = tsdb->keepCfg.days;
  committer->precision = tsdb->keepCfg.precision;
  committer->minRow = info->info.config.tsdbCfg.minRows;
  committer->maxRow = info->info.config.tsdbCfg.maxRows;
  committer->cmprAlg = info->info.config.tsdbCfg.compression;
  committer->sttTrigger = info->info.config.sttTrigger;
H
Hongze Cheng 已提交
377 378
  committer->szPage = info->info.config.tsdbPageSize;
  committer->compactVersion = INT64_MAX;
H
Hongze Cheng 已提交
379

H
Hongze Cheng 已提交
380
  committer->ctx->cid = tsdbFSAllocEid(tsdb->pFS);
H
Hongze Cheng 已提交
381 382
  committer->ctx->now = taosGetTimestampSec();
  committer->ctx->nextKey = tsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
383

H
Hongze Cheng 已提交
384 385
  TARRAY2_INIT(committer->fopArray);

H
Hongze Cheng 已提交
386 387
_exit:
  if (code) {
H
Hongze Cheng 已提交
388
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
389
  } else {
H
Hongze Cheng 已提交
390
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
391 392 393 394
  }
  return code;
}

H
Hongze Cheng 已提交
395
static int32_t tsdbCloseCommitter(SCommitter2 *committer, int32_t eno) {
H
Hongze Cheng 已提交
396
  int32_t code = 0;
H
Hongze Cheng 已提交
397
  int32_t lino = 0;
H
Hongze Cheng 已提交
398
  int32_t vid = TD_VID(committer->tsdb->pVnode);
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400
  if (eno == 0) {
H
Hongze Cheng 已提交
401
    code = tsdbFSEditBegin(committer->tsdb->pFS, committer->fopArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
402
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
403
  } else {
H
Hongze Cheng 已提交
404 405
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
406
  }
H
Hongze Cheng 已提交
407

H
Hongze Cheng 已提交
408 409 410 411 412 413
  ASSERT(committer->dataWriter == NULL);
  ASSERT(committer->sttWriter == NULL);
  ASSERT(committer->iterMerger == NULL);
  TARRAY2_FREE(committer->iterArray);
  TARRAY2_FREE(committer->fopArray);
  tsdbFSDestroyCopySnapshot(&committer->fsetArr);
H
Hongze Cheng 已提交
414

H
Hongze Cheng 已提交
415
_exit:
H
Hongze Cheng 已提交
416
  if (code) {
H
Hongze Cheng 已提交
417
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code),
H
Hongze Cheng 已提交
418
              committer->ctx->cid);
H
Hongze Cheng 已提交
419
  } else {
H
Hongze Cheng 已提交
420
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, committer->ctx->cid);
H
Hongze Cheng 已提交
421
  }
H
Hongze Cheng 已提交
422 423 424
  return code;
}

H
Hongze Cheng 已提交
425 426 427 428 429 430
int32_t tsdbPreCommit(STsdb *tsdb) {
  taosThreadRwlockWrlock(&tsdb->rwLock);
  ASSERT(tsdb->imem == NULL);
  tsdb->imem = tsdb->mem;
  tsdb->mem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
431 432 433
  return 0;
}

H
Hongze Cheng 已提交
434 435
int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
  if (!tsdb) return 0;
H
Hongze Cheng 已提交
436 437 438

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
439
  int32_t    vid = TD_VID(tsdb->pVnode);
H
Hongze Cheng 已提交
440 441 442
  SMemTable *imem = tsdb->imem;
  int64_t    nRow = imem->nRow;
  int64_t    nDel = imem->nDel;
H
Hongze Cheng 已提交
443 444 445 446 447

  if (!nRow && !nDel) {
    taosThreadRwlockWrlock(&tsdb->rwLock);
    tsdb->imem = NULL;
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
448
    tsdbUnrefMemTable(imem, NULL, true);
H
Hongze Cheng 已提交
449
  } else {
H
Hongze Cheng 已提交
450
    SCommitter2 committer[1];
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452
    code = tsdbOpenCommitter(tsdb, info, committer);
H
Hongze Cheng 已提交
453
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455 456
    while (committer->ctx->nextKey != TSKEY_MAX) {
      code = tsdbCommitFileSet(committer);
H
Hongze Cheng 已提交
457
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
458
    }
H
Hongze Cheng 已提交
459

H
Hongze Cheng 已提交
460
    code = tsdbCloseCommitter(committer, code);
H
Hongze Cheng 已提交
461 462
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
463 464 465

_exit:
  if (code) {
H
Hongze Cheng 已提交
466
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
467
  } else {
H
Hongze Cheng 已提交
468
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, vid, __func__, nRow, nDel);
H
Hongze Cheng 已提交
469 470 471 472
  }
  return code;
}

H
Hongze Cheng 已提交
473
int32_t tsdbCommitCommit(STsdb *tsdb) {
H
Hongze Cheng 已提交
474 475
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
476

H
Hongze Cheng 已提交
477
  if (tsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
478

H
Hongze Cheng 已提交
479 480 481
  SMemTable *pMemTable = tsdb->imem;
  taosThreadRwlockWrlock(&tsdb->rwLock);
  code = tsdbFSEditCommit(tsdb->pFS);
H
Hongze Cheng 已提交
482
  if (code) {
H
Hongze Cheng 已提交
483
    taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
484 485
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
486 487
  tsdb->imem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
488
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
489

H
Hongze Cheng 已提交
490
  // TODO: make this call async
H
Hongze Cheng 已提交
491
  code = tsdbMerge(tsdb);
H
Hongze Cheng 已提交
492
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494 495
_exit:
  if (code) {
H
Hongze Cheng 已提交
496
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
H
Hongze Cheng 已提交
497
  } else {
H
Hongze Cheng 已提交
498
    tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
499 500 501 502 503 504 505
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
506 507 508
  int32_t vid = TD_VID(pTsdb->pVnode);

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
509

H
Hongze Cheng 已提交
510
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
511 512 513 514
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
515
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
516
  } else {
H
Hongze Cheng 已提交
517
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
518 519 520
  }
  return code;
}