tsdbCommit.c 13.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbCommit.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19 20
typedef struct {
  STsdb *pTsdb;
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25 26 27 28
  // config
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31 32
  SArray      *aTbDataP;  // SArray<STbData *>
  TFileOpArray opArray;
  int64_t      eid;  // edit id
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34
  // context
H
Hongze Cheng 已提交
35 36 37 38 39
  TSKEY            nextKey;
  int32_t          fid;
  int32_t          expLevel;
  TSKEY            minKey;
  TSKEY            maxKey;
H
Hongze Cheng 已提交
40
  const STFileSet *fset;
H
Hongze Cheng 已提交
41

H
Hongze Cheng 已提交
42
  // writer
H
Hongze Cheng 已提交
43
  SSttFileWriter *pWriter;
H
Hongze Cheng 已提交
44
} SCommitter;
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
47
  int32_t code = 0;
H
Hongze Cheng 已提交
48 49
  int32_t lino = 0;
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);

  SSttFileWriterConfig config;
  SDiskID              did;

  if (tfsAllocDisk(pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
    code = TSDB_CODE_FS_NO_VALID_DISK;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  config.pTsdb = pTsdb;
  config.maxRow = pCommitter->maxRow;
  config.szPage = pVnode->config.tsdbPageSize;
  config.cmprAlg = pCommitter->cmprAlg;
  config.pSkmTb = NULL;
  config.pSkmRow = NULL;
  config.aBuf = NULL;
  config.file.type = TSDB_FTYPE_STT;
  config.file.did = did;
  config.file.fid = pCommitter->fid;
  config.file.cid = pCommitter->eid;
  config.file.size = 0;
  config.file.stt.level = 0;
  config.file.stt.nseg = 0;

  code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
  }
  return code;
}
static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile *pFile) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *pTsdb = pCommitter->pTsdb;
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
  SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
95 96
      //
      .pTsdb = pTsdb,
H
Hongze Cheng 已提交
97
      .maxRow = pCommitter->maxRow,
H
Hongze Cheng 已提交
98
      .szPage = pVnode->config.tsdbPageSize,
H
Hongze Cheng 已提交
99
      .cmprAlg = pCommitter->cmprAlg,
H
Hongze Cheng 已提交
100 101
      .pSkmTb = NULL,
      .pSkmRow = NULL,
H
Hongze Cheng 已提交
102
      .aBuf = NULL,
H
Hongze Cheng 已提交
103
      .file = *pFile  //
H
Hongze Cheng 已提交
104
  };
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106
  code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
H
Hongze Cheng 已提交
107 108 109 110
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
111 112 113
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
H
Hongze Cheng 已提交
114
  }
H
Hongze Cheng 已提交
115 116
  return code;
}
H
Hongze Cheng 已提交
117 118 119 120 121
static int32_t open_committer_writer(SCommitter *pCommitter) {
  if (!pCommitter->fset) {
    return open_writer_with_new_stt(pCommitter);
  }

H
Hongze Cheng 已提交
122
  const SSttLvl *lvl0 = tsdbTFileSetGetLvl(pCommitter->fset, 0);
H
Hongze Cheng 已提交
123 124 125 126
  if (lvl0 == NULL) {
    return open_writer_with_new_stt(pCommitter);
  }

H
Hongze Cheng 已提交
127
  SRBTreeNode *node = NULL;  // tRBTreeMax(&lvl0->sttTree);
H
Hongze Cheng 已提交
128 129 130
  if (node == NULL) {
    return open_writer_with_new_stt(pCommitter);
  } else {
H
Hongze Cheng 已提交
131 132 133 134 135 136 137
    // STFileObj *fobj = TCONTAINER_OF(node, STFileObj, rbtn);
    // if (fobj->f.stt.nseg >= pCommitter->sttTrigger) {
    //   return open_writer_with_new_stt(pCommitter);
    // } else {
    //   return open_writer_with_exist_stt(pCommitter, &fobj->f);
    // }
    return 0;
H
Hongze Cheng 已提交
138 139
  }
}
H
Hongze Cheng 已提交
140

H
Hongze Cheng 已提交
141
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
H
Hongze Cheng 已提交
142
  int32_t code = 0;
H
Hongze Cheng 已提交
143 144
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
145 146

  if (pCommitter->pWriter == NULL) {
H
Hongze Cheng 已提交
147
    code = open_committer_writer(pCommitter);
H
Hongze Cheng 已提交
148 149 150
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
151
  code = tsdbSttFWriteTSData(pCommitter->pWriter, tbid, pRow);
H
Hongze Cheng 已提交
152 153 154 155
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
156
    tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
H
Hongze Cheng 已提交
157
  } else {
H
Hongze Cheng 已提交
158 159
    tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, vid, __func__,
              pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version);
H
Hongze Cheng 已提交
160
  }
H
Hongze Cheng 已提交
161
  return 0;
H
Hongze Cheng 已提交
162 163
}

H
Hongze Cheng 已提交
164 165 166 167 168 169 170
static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
171
static int32_t commit_timeseries_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
172 173
  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
174
  int64_t    nRow = 0;
H
Hongze Cheng 已提交
175 176 177
  STsdb     *pTsdb = pCommitter->pTsdb;
  int32_t    vid = TD_VID(pTsdb->pVnode);
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
178

H
Hongze Cheng 已提交
179
  if (pMem->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181
  TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
182 183
  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbDataIter iter;
H
Hongze Cheng 已提交
184 185
    STbData    *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

H
Hongze Cheng 已提交
186 187 188 189 190 191
    tsdbTbDataIterOpen(pTbData, &from, 0, &iter);

    for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
      TSDBKEY rowKey = TSDBROW_KEY(pRow);

      if (rowKey.ts > pCommitter->maxKey) {
H
Hongze Cheng 已提交
192
        pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts);
H
Hongze Cheng 已提交
193 194 195
        break;
      }

H
Hongze Cheng 已提交
196
      code = tsdbCommitWriteTSData(pCommitter, (TABLEID *)pTbData, pRow);
H
Hongze Cheng 已提交
197
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
198 199

      nRow++;
H
Hongze Cheng 已提交
200 201 202 203 204
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
205
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
206
  } else {
H
Hongze Cheng 已提交
207
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, pCommitter->fid, nRow);
H
Hongze Cheng 已提交
208 209 210 211
  }
  return code;
}

H
Hongze Cheng 已提交
212
static int32_t commit_delete_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
213 214 215
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
216 217
  return 0;

H
Hongze Cheng 已提交
218
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
219

H
Hongze Cheng 已提交
220
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
221 222
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
223 224 225
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
226 227 228 229 230

  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
231 232 233 234 235
      if (pDelData->eKey < pCommitter->minKey) continue;
      if (pDelData->sKey > pCommitter->maxKey) {
        pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey);
        continue;
      }
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237 238
      code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
              pMem->nDel);
  }
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256
static int32_t commit_fset_start(SCommitter *pCommitter) {
  STsdb  *pTsdb = pCommitter->pTsdb;
  int32_t vid = TD_VID(pTsdb->pVnode);

H
Hongze Cheng 已提交
257 258 259
  pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
260
  pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
261
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
262

H
Hongze Cheng 已提交
263
  tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->fset);
H
Hongze Cheng 已提交
264

H
Hongze Cheng 已提交
265 266
  tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid,
            pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel);
H
Hongze Cheng 已提交
267
  return 0;
H
Hongze Cheng 已提交
268 269
}

H
Hongze Cheng 已提交
270
static int32_t commit_fset_end(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
271
  int32_t code = 0;
H
Hongze Cheng 已提交
272 273
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275 276
  if (pCommitter->pWriter == NULL) return 0;

H
Hongze Cheng 已提交
277 278 279
  STFileOp op;
  code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
280

H
Hongze Cheng 已提交
281 282
  code = TARRAY2_APPEND(&pCommitter->opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
283 284 285

_exit:
  if (code) {
H
Hongze Cheng 已提交
286
    tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
H
Hongze Cheng 已提交
287
  } else {
H
Hongze Cheng 已提交
288
    tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->fid);
H
Hongze Cheng 已提交
289 290 291 292
  }
  return code;
}

H
Hongze Cheng 已提交
293
static int32_t commit_fset(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
294 295
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
296
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298
  // fset commit start
H
Hongze Cheng 已提交
299
  code = commit_fset_start(pCommitter);
H
Hongze Cheng 已提交
300
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302
  // commit fset
H
Hongze Cheng 已提交
303
  code = commit_timeseries_data(pCommitter);
H
Hongze Cheng 已提交
304 305
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
306
  code = commit_delete_data(pCommitter);
H
Hongze Cheng 已提交
307 308 309
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
310
  code = commit_fset_end(pCommitter);
H
Hongze Cheng 已提交
311
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
312 313 314

_exit:
  if (code) {
H
Hongze Cheng 已提交
315 316 317
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
318 319 320 321
  }
  return code;
}

H
Hongze Cheng 已提交
322
static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
323
  int32_t code = 0;
H
Hongze Cheng 已提交
324
  int32_t lino;
H
Hongze Cheng 已提交
325

H
Hongze Cheng 已提交
326
  // set config
H
Hongze Cheng 已提交
327 328
  memset(pCommitter, 0, sizeof(SCommitter));
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
329 330 331 332 333
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
H
Hongze Cheng 已提交
334
  pCommitter->sttTrigger = 1;  // TODO
H
Hongze Cheng 已提交
335 336

  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
H
Hongze Cheng 已提交
337
  if (pCommitter->aTbDataP == NULL) {
H
Hongze Cheng 已提交
338 339
    taosArrayDestroy(pCommitter->aTbDataP);
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
H
Hongze Cheng 已提交
340
  }
H
Hongze Cheng 已提交
341
  TARRAY2_INIT(&pCommitter->opArray);
H
Hongze Cheng 已提交
342
  tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid);
H
Hongze Cheng 已提交
343

H
Hongze Cheng 已提交
344 345
  // start loop
  pCommitter->nextKey = pTsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
346 347 348

_exit:
  if (code) {
H
Hongze Cheng 已提交
349
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
350
  } else {
H
Hongze Cheng 已提交
351
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
352 353 354 355
  }
  return code;
}

H
Hongze Cheng 已提交
356
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
357
  int32_t code = 0;
H
Hongze Cheng 已提交
358 359
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
H
Hongze Cheng 已提交
360

H
Hongze Cheng 已提交
361
  if (eno == 0) {
H
Hongze Cheng 已提交
362
    code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, &pCommiter->opArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
363
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
364
  } else {
H
Hongze Cheng 已提交
365 366
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
367
  }
H
Hongze Cheng 已提交
368

H
Hongze Cheng 已提交
369 370
  ASSERT(pCommiter->pWriter == NULL);
  taosArrayDestroy(pCommiter->aTbDataP);
H
Hongze Cheng 已提交
371
  TARRAY2_CLEAR_FREE(&pCommiter->opArray, NULL);
H
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373
_exit:
H
Hongze Cheng 已提交
374
  if (code) {
H
Hongze Cheng 已提交
375 376
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code),
              pCommiter->eid);
H
Hongze Cheng 已提交
377
  } else {
H
Hongze Cheng 已提交
378
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid);
H
Hongze Cheng 已提交
379
  }
H
Hongze Cheng 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
  return code;
}

int32_t tsdbPreCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return 0;
}

int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
  if (!pTsdb) return 0;

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
397
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399
  if (pMem->nRow == 0 && pMem->nDel == 0) {
H
Hongze Cheng 已提交
400 401 402
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
403 404 405
    tsdbUnrefMemTable(pMem, NULL, true);
  } else {
    SCommitter committer;
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407
    code = open_committer(pTsdb, pInfo, &committer);
H
Hongze Cheng 已提交
408
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
409

H
Hongze Cheng 已提交
410
    while (committer.nextKey != TSKEY_MAX) {
H
Hongze Cheng 已提交
411 412 413 414 415
      code = commit_fset(&committer);
      if (code) {
        lino = __LINE__;
        break;
      }
H
Hongze Cheng 已提交
416
    }
H
Hongze Cheng 已提交
417

H
Hongze Cheng 已提交
418
    code = close_committer(&committer, code);
H
Hongze Cheng 已提交
419 420
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
421 422 423

_exit:
  if (code) {
H
Hongze Cheng 已提交
424
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
425
  } else {
H
Hongze Cheng 已提交
426
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow,
H
Hongze Cheng 已提交
427
             pMem->nDel);
H
Hongze Cheng 已提交
428 429 430 431
  }
  return code;
}

H
Hongze Cheng 已提交
432
int32_t tsdbCommitCommit(STsdb *pTsdb) {
H
Hongze Cheng 已提交
433 434 435
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(pTsdb->pVnode);
H
Hongze Cheng 已提交
436

H
Hongze Cheng 已提交
437
  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
438

H
Hongze Cheng 已提交
439 440
  SMemTable *pMemTable = pTsdb->imem;
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
441
  code = tsdbFSEditCommit(pTsdb->pFS);
H
Hongze Cheng 已提交
442 443 444 445 446 447
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  pTsdb->imem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
448
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
449 450 451

_exit:
  if (code) {
H
Hongze Cheng 已提交
452
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
453
  } else {
H
Hongze Cheng 已提交
454
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
455 456 457 458 459 460 461
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
462 463 464
  int32_t vid = TD_VID(pTsdb->pVnode);

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
465

H
Hongze Cheng 已提交
466
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
467 468 469 470
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
471
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
472
  } else {
H
Hongze Cheng 已提交
473
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
474 475 476
  }
  return code;
}