tsdbCommit.c 13.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbCommit.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19
typedef struct {
H
Hongze Cheng 已提交
20
  STsdb  *tsdb;
H
Hongze Cheng 已提交
21 22 23 24 25 26
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
  int64_t compactVersion;

  struct {
    int64_t    now;
    TSKEY      nextKey;
    int32_t    fid;
    int32_t    expLevel;
    TSKEY      minKey;
    TSKEY      maxKey;
    STFileSet *fset;
    TABLEID    tbid[1];
  } ctx[1];

  int64_t        eid;  // edit id
  TFileOpArray   fopArray[1];
  TTsdbIterArray iterArray[1];
  SIterMerger   *iterMerger;
H
Hongze Cheng 已提交
44

H
Hongze Cheng 已提交
45
  // writer
H
Hongze Cheng 已提交
46 47 48
  SDataFileWriter *dataWriter;
  SSttFileWriter  *sttWriter;
} SCommitter2;
H
Hongze Cheng 已提交
49

H
Hongze Cheng 已提交
50
static int32_t tsdbCommitOpenNewSttWriter(SCommitter2 *pCommitter) {
H
Hongze Cheng 已提交
51
  int32_t code = 0;
H
Hongze Cheng 已提交
52
  int32_t lino = 0;
H
Hongze Cheng 已提交
53
  STsdb  *pTsdb = pCommitter->tsdb;
H
Hongze Cheng 已提交
54 55 56
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);

H
Hongze Cheng 已提交
57 58
  SSttFileWriterConfig config[1];
  SDiskID              did[1];
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
  if (tfsAllocDisk(pVnode->pTfs, pCommitter->ctx->expLevel, did) < 0) {
H
Hongze Cheng 已提交
61 62 63 64
    code = TSDB_CODE_FS_NO_VALID_DISK;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  config->tsdb = pTsdb;
  config->maxRow = pCommitter->maxRow;
  config->szPage = pVnode->config.tsdbPageSize;
  config->cmprAlg = pCommitter->cmprAlg;
  config->skmTb = NULL;
  config->skmRow = NULL;
  config->aBuf = NULL;
  config->file.type = TSDB_FTYPE_STT;
  config->file.did = did[0];
  config->file.fid = pCommitter->ctx->fid;
  config->file.cid = pCommitter->eid;
  config->file.size = 0;
  config->file.stt->level = 0;
  config->file.stt->nseg = 0;

  code = tsdbSttFileWriterOpen(config, &pCommitter->sttWriter);
H
Hongze Cheng 已提交
81 82 83 84 85 86 87 88 89 90
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
  }
  return code;
}
H
Hongze Cheng 已提交
91
static int32_t tsdbCommitOpenExistSttWriter(SCommitter2 *pCommitter, const STFile *pFile) {
H
Hongze Cheng 已提交
92 93
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
94
  STsdb  *pTsdb = pCommitter->tsdb;
H
Hongze Cheng 已提交
95 96
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98
  SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
99
      //
H
Hongze Cheng 已提交
100
      .tsdb = pTsdb,
H
Hongze Cheng 已提交
101
      .maxRow = pCommitter->maxRow,
H
Hongze Cheng 已提交
102
      .szPage = pVnode->config.tsdbPageSize,
H
Hongze Cheng 已提交
103
      .cmprAlg = pCommitter->cmprAlg,
H
Hongze Cheng 已提交
104 105
      .skmTb = NULL,
      .skmRow = NULL,
H
Hongze Cheng 已提交
106
      .aBuf = NULL,
H
Hongze Cheng 已提交
107
      .file = *pFile  //
H
Hongze Cheng 已提交
108
  };
H
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
  code = tsdbSttFileWriterOpen(&config, &pCommitter->sttWriter);
H
Hongze Cheng 已提交
111 112 113 114
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
115 116 117
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
H
Hongze Cheng 已提交
118
  }
H
Hongze Cheng 已提交
119 120
  return code;
}
H
Hongze Cheng 已提交
121 122 123
static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
  if (!committer->ctx->fset) {
    return tsdbCommitOpenNewSttWriter(committer);
H
Hongze Cheng 已提交
124 125
  }

H
Hongze Cheng 已提交
126
  const SSttLvl *lvl0 = tsdbTFileSetGetLvl(committer->ctx->fset, 0);
H
Hongze Cheng 已提交
127
  if (lvl0 == NULL) {
H
Hongze Cheng 已提交
128
    return tsdbCommitOpenNewSttWriter(committer);
H
Hongze Cheng 已提交
129 130
  }

H
Hongze Cheng 已提交
131 132
  ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
  STFileObj *fobj = TARRAY2_LAST(&lvl0->farr);
H
Hongze Cheng 已提交
133 134
  if (fobj->f->stt->nseg >= committer->sttTrigger) {
    return tsdbCommitOpenNewSttWriter(committer);
H
Hongze Cheng 已提交
135
  } else {
H
Hongze Cheng 已提交
136
    return tsdbCommitOpenExistSttWriter(committer, fobj->f);
H
Hongze Cheng 已提交
137 138
  }
}
H
Hongze Cheng 已提交
139

H
Hongze Cheng 已提交
140 141
static int32_t tsdbCommitTSRow(SCommitter2 *committer, SRowInfo *row) {
  return tsdbSttFileWriteTSData(committer->sttWriter, row);
H
Hongze Cheng 已提交
142 143
}

H
Hongze Cheng 已提交
144
static int32_t tsdbCommitWriteDelData(SCommitter2 *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
H
Hongze Cheng 已提交
145 146 147 148 149 150
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
static int32_t tsdbCommitTSData(SCommitter2 *committer) {
  int32_t   code = 0;
  int32_t   lino = 0;
  int64_t   nRow = 0;
  int32_t   vid = TD_VID(committer->tsdb->pVnode);
  SRowInfo *row;

  if (committer->tsdb->imem->nRow == 0) goto _exit;

  // open iter and iter merger
  STsdbIter      *iter;
  STsdbIterConfig config[1] = {{
      .type = TSDB_ITER_TYPE_MEMT,
      .memt = committer->tsdb->imem,
      .from = {{
          .ts = committer->ctx->minKey,
          .version = VERSION_MIN,
      }},
  }};

  code = tsdbIterOpen(config, &iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174 175
  code = TARRAY2_APPEND(committer->iterArray, iter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178
  code = tsdbIterMergerInit(committer->iterArray, &committer->iterMerger);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180 181 182 183 184 185 186 187 188 189 190 191
  // loop iter
  while ((row = tsdbIterMergerGet(committer->iterMerger)) != NULL) {
    if (row->uid != committer->ctx->tbid->uid) {
      committer->ctx->tbid->suid = row->suid;
      committer->ctx->tbid->uid = row->uid;

      // Ignore deleted table
      SMetaInfo info[1];
      if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, info, NULL) != 0) {
        code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
        TSDB_CHECK_CODE(code, lino, _exit);
        continue;
H
Hongze Cheng 已提交
192
      }
H
Hongze Cheng 已提交
193
    }
H
Hongze Cheng 已提交
194

H
Hongze Cheng 已提交
195 196 197 198 199 200 201 202
    TSKEY ts = TSDBROW_TS(&row->row);
    if (ts > committer->ctx->maxKey) {
      committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);

      code = tsdbIterMergerSkipTableData(committer->iterMerger, committer->ctx->tbid);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbCommitTSRow(committer, row);
H
Hongze Cheng 已提交
203
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205 206
      code = tsdbIterMergerNext(committer->iterMerger);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
207 208 209 210 211
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
212
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
213
  } else {
H
Hongze Cheng 已提交
214
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, committer->ctx->fid, nRow);
H
Hongze Cheng 已提交
215 216 217 218
  }
  return code;
}

H
Hongze Cheng 已提交
219
static int32_t tsdbCommitDelData(SCommitter2 *pCommitter) {
H
Hongze Cheng 已提交
220 221 222
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
223 224
  return 0;

H
Hongze Cheng 已提交
225
#if 0
H
Hongze Cheng 已提交
226
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
229
  SMemTable *pMem = pCommitter->tsdb->imem;
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231 232 233
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
234 235 236 237 238

  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
239 240 241
      if (pDelData->eKey < pCommitter->ctx->minKey) continue;
      if (pDelData->sKey > pCommitter->ctx->maxKey) {
        pCommitter->ctx->nextKey = TMIN(pCommitter->ctx->nextKey, pDelData->sKey);
H
Hongze Cheng 已提交
242 243
        continue;
      }
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245 246
      code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
247 248 249 250 251 252
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
253
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->tsdb->pVnode), lino, tstrerror(code));
H
Hongze Cheng 已提交
254
  } else {
H
Hongze Cheng 已提交
255
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->tsdb->pVnode), __func__, pCommitter->ctx->fid,
H
Hongze Cheng 已提交
256 257 258
              pMem->nDel);
  }
  return code;
H
Hongze Cheng 已提交
259
#endif
H
Hongze Cheng 已提交
260 261
}

H
Hongze Cheng 已提交
262 263 264 265 266 267 268 269 270 271 272
static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *tsdb = committer->tsdb;
  int32_t vid = TD_VID(tsdb->pVnode);

  committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
  tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
                  &committer->ctx->maxKey);
  committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
  committer->ctx->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274 275
  // TODO: use a thread safe function to get fset
  tsdbFSGetFSet(tsdb->pFS, committer->ctx->fid, &committer->ctx->fset);
H
Hongze Cheng 已提交
276

H
Hongze Cheng 已提交
277 278
  code = tsdbCommitOpenWriter(committer);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
279

H
Hongze Cheng 已提交
280 281 282 283 284 285 286
_exit:
  if (code) {
    TSDB_ERROR_LOG(vid, lino, code);
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__,
              committer->ctx->fid, committer->ctx->minKey, committer->ctx->maxKey, committer->ctx->expLevel);
  }
H
Hongze Cheng 已提交
287
  return 0;
H
Hongze Cheng 已提交
288 289
}

H
Hongze Cheng 已提交
290
static int32_t tsdbCommitFileSetEnd(SCommitter2 *pCommitter) {
H
Hongze Cheng 已提交
291
  int32_t code = 0;
H
Hongze Cheng 已提交
292
  int32_t lino = 0;
H
Hongze Cheng 已提交
293
  int32_t vid = TD_VID(pCommitter->tsdb->pVnode);
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295
  if (pCommitter->sttWriter == NULL) return 0;
H
Hongze Cheng 已提交
296

H
Hongze Cheng 已提交
297
  STFileOp op;
H
Hongze Cheng 已提交
298
  code = tsdbSttFileWriterClose(&pCommitter->sttWriter, 0, &op);
H
Hongze Cheng 已提交
299
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
300

H
Hongze Cheng 已提交
301
  if (op.optype != TSDB_FOP_NONE) {
H
Hongze Cheng 已提交
302
    code = TARRAY2_APPEND(pCommitter->fopArray, op);
H
Hongze Cheng 已提交
303 304
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
305 306 307

_exit:
  if (code) {
H
Hongze Cheng 已提交
308
    tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
H
Hongze Cheng 已提交
309
  } else {
H
Hongze Cheng 已提交
310
    tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->ctx->fid);
H
Hongze Cheng 已提交
311 312 313 314
  }
  return code;
}

H
Hongze Cheng 已提交
315
static int32_t tsdbCommitFileSet(SCommitter2 *committer) {
H
Hongze Cheng 已提交
316 317
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
318
  int32_t vid = TD_VID(committer->tsdb->pVnode);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  // fset commit start
H
Hongze Cheng 已提交
321
  code = tsdbCommitFileSetBegin(committer);
H
Hongze Cheng 已提交
322
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
323

H
Hongze Cheng 已提交
324
  // commit fset
H
Hongze Cheng 已提交
325
  code = tsdbCommitTSData(committer);
H
Hongze Cheng 已提交
326 327
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
328
  code = tsdbCommitDelData(committer);
H
Hongze Cheng 已提交
329 330 331
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
332
  code = tsdbCommitFileSetEnd(committer);
H
Hongze Cheng 已提交
333
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
334 335 336

_exit:
  if (code) {
H
Hongze Cheng 已提交
337
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
338
  } else {
H
Hongze Cheng 已提交
339
    tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, committer->ctx->fid);
H
Hongze Cheng 已提交
340 341 342 343
  }
  return code;
}

H
Hongze Cheng 已提交
344
static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *committer) {
H
Hongze Cheng 已提交
345
  int32_t code = 0;
H
Hongze Cheng 已提交
346 347
  int32_t lino = 0;
  int32_t vid = TD_VID(tsdb->pVnode);
H
Hongze Cheng 已提交
348

H
Hongze Cheng 已提交
349 350 351 352 353 354 355 356 357 358
  memset(committer, 0, sizeof(committer[0]));

  committer->tsdb = tsdb;
  committer->minutes = tsdb->keepCfg.days;
  committer->precision = tsdb->keepCfg.precision;
  committer->minRow = info->info.config.tsdbCfg.minRows;
  committer->maxRow = info->info.config.tsdbCfg.maxRows;
  committer->cmprAlg = info->info.config.tsdbCfg.compression;
  committer->sttTrigger = info->info.config.sttTrigger;
  committer->compactVersion = INT64_MAX;  // TODO: use a function
H
Hongze Cheng 已提交
359

H
Hongze Cheng 已提交
360 361 362 363 364
  TARRAY2_INIT(committer->fopArray);
  tsdbFSAllocEid(tsdb->pFS, &committer->eid);

  committer->ctx->now = taosGetTimestampSec();
  committer->ctx->nextKey = tsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
365 366 367

_exit:
  if (code) {
H
Hongze Cheng 已提交
368
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
369
  } else {
H
Hongze Cheng 已提交
370
    tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
H
Hongze Cheng 已提交
371 372 373 374
  }
  return code;
}

H
Hongze Cheng 已提交
375
static int32_t tsdbCloseCommitter(SCommitter2 *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
376
  int32_t code = 0;
H
Hongze Cheng 已提交
377
  int32_t lino = 0;
H
Hongze Cheng 已提交
378
  int32_t vid = TD_VID(pCommiter->tsdb->pVnode);
H
Hongze Cheng 已提交
379

H
Hongze Cheng 已提交
380
  if (eno == 0) {
H
Hongze Cheng 已提交
381
    code = tsdbFSEditBegin(pCommiter->tsdb->pFS, pCommiter->fopArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
382
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
383
  } else {
H
Hongze Cheng 已提交
384 385
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
386
  }
H
Hongze Cheng 已提交
387

H
Hongze Cheng 已提交
388 389
  ASSERT(pCommiter->sttWriter == NULL);
  TARRAY2_FREE(pCommiter->fopArray);
H
Hongze Cheng 已提交
390

H
Hongze Cheng 已提交
391
_exit:
H
Hongze Cheng 已提交
392
  if (code) {
H
Hongze Cheng 已提交
393 394
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code),
              pCommiter->eid);
H
Hongze Cheng 已提交
395
  } else {
H
Hongze Cheng 已提交
396
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid);
H
Hongze Cheng 已提交
397
  }
H
Hongze Cheng 已提交
398 399 400
  return code;
}

H
Hongze Cheng 已提交
401 402 403 404 405 406
int32_t tsdbPreCommit(STsdb *tsdb) {
  taosThreadRwlockWrlock(&tsdb->rwLock);
  ASSERT(tsdb->imem == NULL);
  tsdb->imem = tsdb->mem;
  tsdb->mem = NULL;
  taosThreadRwlockUnlock(&tsdb->rwLock);
H
Hongze Cheng 已提交
407 408 409
  return 0;
}

H
Hongze Cheng 已提交
410 411
int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
  if (!tsdb) return 0;
H
Hongze Cheng 已提交
412 413 414

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
415 416 417 418 419 420 421 422 423 424
  int32_t    vid = TD_VID(tsdb->pVnode);
  SMemTable *memt = tsdb->imem;
  int64_t    nRow = memt->nRow;
  int64_t    nDel = memt->nDel;

  if (!nRow && !nDel) {
    taosThreadRwlockWrlock(&tsdb->rwLock);
    tsdb->imem = NULL;
    taosThreadRwlockUnlock(&tsdb->rwLock);
    tsdbUnrefMemTable(memt, NULL, true);
H
Hongze Cheng 已提交
425
  } else {
H
Hongze Cheng 已提交
426
    SCommitter2 committer[1];
H
Hongze Cheng 已提交
427

H
Hongze Cheng 已提交
428
    code = tsdbOpenCommitter(tsdb, info, committer);
H
Hongze Cheng 已提交
429
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
430

H
Hongze Cheng 已提交
431 432
    while (committer->ctx->nextKey != TSKEY_MAX) {
      code = tsdbCommitFileSet(committer);
H
Hongze Cheng 已提交
433 434 435 436
      if (code) {
        lino = __LINE__;
        break;
      }
H
Hongze Cheng 已提交
437
    }
H
Hongze Cheng 已提交
438

H
Hongze Cheng 已提交
439
    code = tsdbCloseCommitter(committer, code);
H
Hongze Cheng 已提交
440 441
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
442 443 444

_exit:
  if (code) {
H
Hongze Cheng 已提交
445
    TSDB_ERROR_LOG(vid, lino, code);
H
Hongze Cheng 已提交
446
  } else {
H
Hongze Cheng 已提交
447
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, vid, __func__, nRow, nDel);
H
Hongze Cheng 已提交
448 449 450 451
  }
  return code;
}

H
Hongze Cheng 已提交
452
int32_t tsdbCommitCommit(STsdb *pTsdb) {
H
Hongze Cheng 已提交
453 454 455
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(pTsdb->pVnode);
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457
  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459 460
  SMemTable *pMemTable = pTsdb->imem;
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
461
  code = tsdbFSEditCommit(pTsdb->pFS);
H
Hongze Cheng 已提交
462 463 464 465 466 467
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  pTsdb->imem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
468
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470 471 472
  // TODO: make this call async
  code = tsdbMerge(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475
_exit:
  if (code) {
H
Hongze Cheng 已提交
476
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
477
  } else {
H
Hongze Cheng 已提交
478
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
479 480 481 482 483 484 485
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
486 487 488
  int32_t vid = TD_VID(pTsdb->pVnode);

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
489

H
Hongze Cheng 已提交
490
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
491 492 493 494
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
495
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
496
  } else {
H
Hongze Cheng 已提交
497
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
498 499 500
  }
  return code;
}