You need to sign in or sign up before continuing.
tsdbCommit.c 13.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "inc/tsdbCommit.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
// extern dependencies
H
Hongze Cheng 已提交
19 20
typedef struct {
  STsdb *pTsdb;
H
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24 25 26 27 28
  // config
  int32_t minutes;
  int8_t  precision;
  int32_t minRow;
  int32_t maxRow;
  int8_t  cmprAlg;
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
29

H
Hongze Cheng 已提交
30 31 32
  SArray      *aTbDataP;  // SArray<STbData *>
  TFileOpArray opArray;
  int64_t      eid;  // edit id
H
Hongze Cheng 已提交
33

H
Hongze Cheng 已提交
34
  // context
H
Hongze Cheng 已提交
35 36 37 38 39 40
  TSKEY      nextKey;
  int32_t    fid;
  int32_t    expLevel;
  TSKEY      minKey;
  TSKEY      maxKey;
  STFileSet *fset;
H
Hongze Cheng 已提交
41

H
Hongze Cheng 已提交
42
  // writer
H
Hongze Cheng 已提交
43
  SSttFileWriter *pWriter;
H
Hongze Cheng 已提交
44
} SCommitter;
H
Hongze Cheng 已提交
45

H
Hongze Cheng 已提交
46
static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
47
  int32_t code = 0;
H
Hongze Cheng 已提交
48 49
  int32_t lino = 0;
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
50 51 52 53 54 55 56 57 58 59 60
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);

  SSttFileWriterConfig config;
  SDiskID              did;

  if (tfsAllocDisk(pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
    code = TSDB_CODE_FS_NO_VALID_DISK;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
61
  config.tsdb = pTsdb;
H
Hongze Cheng 已提交
62 63 64
  config.maxRow = pCommitter->maxRow;
  config.szPage = pVnode->config.tsdbPageSize;
  config.cmprAlg = pCommitter->cmprAlg;
H
Hongze Cheng 已提交
65 66
  config.skmTb = NULL;
  config.skmRow = NULL;
H
Hongze Cheng 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
  config.aBuf = NULL;
  config.file.type = TSDB_FTYPE_STT;
  config.file.did = did;
  config.file.fid = pCommitter->fid;
  config.file.cid = pCommitter->eid;
  config.file.size = 0;
  config.file.stt.level = 0;
  config.file.stt.nseg = 0;

  code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
  }
  return code;
}
static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile *pFile) {
  int32_t code = 0;
  int32_t lino = 0;
  STsdb  *pTsdb = pCommitter->pTsdb;
  SVnode *pVnode = pTsdb->pVnode;
  int32_t vid = TD_VID(pVnode);
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
  SSttFileWriterConfig config = {
H
Hongze Cheng 已提交
95
      //
H
Hongze Cheng 已提交
96
      .tsdb = pTsdb,
H
Hongze Cheng 已提交
97
      .maxRow = pCommitter->maxRow,
H
Hongze Cheng 已提交
98
      .szPage = pVnode->config.tsdbPageSize,
H
Hongze Cheng 已提交
99
      .cmprAlg = pCommitter->cmprAlg,
H
Hongze Cheng 已提交
100 101
      .skmTb = NULL,
      .skmRow = NULL,
H
Hongze Cheng 已提交
102
      .aBuf = NULL,
H
Hongze Cheng 已提交
103
      .file = *pFile  //
H
Hongze Cheng 已提交
104
  };
H
Hongze Cheng 已提交
105

H
Hongze Cheng 已提交
106
  code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
H
Hongze Cheng 已提交
107 108 109 110
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
111 112 113
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s success", vid, __func__);
H
Hongze Cheng 已提交
114
  }
H
Hongze Cheng 已提交
115 116
  return code;
}
H
Hongze Cheng 已提交
117 118 119 120 121
static int32_t open_committer_writer(SCommitter *pCommitter) {
  if (!pCommitter->fset) {
    return open_writer_with_new_stt(pCommitter);
  }

H
Hongze Cheng 已提交
122
  const SSttLvl *lvl0 = tsdbTFileSetGetLvl(pCommitter->fset, 0);
H
Hongze Cheng 已提交
123 124 125 126
  if (lvl0 == NULL) {
    return open_writer_with_new_stt(pCommitter);
  }

H
Hongze Cheng 已提交
127 128 129
  ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
  STFileObj *fobj = TARRAY2_LAST(&lvl0->farr);
  if (fobj->f.stt.nseg >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
130 131
    return open_writer_with_new_stt(pCommitter);
  } else {
H
Hongze Cheng 已提交
132
    return open_writer_with_exist_stt(pCommitter, &fobj->f);
H
Hongze Cheng 已提交
133 134
  }
}
H
Hongze Cheng 已提交
135

H
Hongze Cheng 已提交
136
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, SRowInfo *pRowInfo) {
H
Hongze Cheng 已提交
137
  int32_t code = 0;
H
Hongze Cheng 已提交
138 139
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
140 141

  if (pCommitter->pWriter == NULL) {
H
Hongze Cheng 已提交
142
    code = open_committer_writer(pCommitter);
H
Hongze Cheng 已提交
143 144 145
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
146
  code = tsdbSttFWriteTSData(pCommitter->pWriter, pRowInfo);
H
Hongze Cheng 已提交
147 148 149 150
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
151
    tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
H
Hongze Cheng 已提交
152
  }
H
Hongze Cheng 已提交
153
  return 0;
H
Hongze Cheng 已提交
154 155
}

H
Hongze Cheng 已提交
156 157 158 159 160 161 162
static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
                                      int64_t eKey) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
163
static int32_t commit_timeseries_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
164 165
  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
166
  int64_t    nRow = 0;
H
Hongze Cheng 已提交
167 168 169
  STsdb     *pTsdb = pCommitter->pTsdb;
  int32_t    vid = TD_VID(pTsdb->pVnode);
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  if (pMem->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173
  TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
174 175
  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbDataIter iter;
H
Hongze Cheng 已提交
176
    STbData    *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
177
    SRowInfo    rowInfo = {.suid = pTbData->suid, .uid = pTbData->uid};
H
Hongze Cheng 已提交
178

H
Hongze Cheng 已提交
179 180 181 182 183 184
    tsdbTbDataIterOpen(pTbData, &from, 0, &iter);

    for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
      TSDBKEY rowKey = TSDBROW_KEY(pRow);

      if (rowKey.ts > pCommitter->maxKey) {
H
Hongze Cheng 已提交
185
        pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts);
H
Hongze Cheng 已提交
186 187 188
        break;
      }

H
Hongze Cheng 已提交
189 190
      rowInfo.row = *pRow;
      code = tsdbCommitWriteTSData(pCommitter, &rowInfo);
H
Hongze Cheng 已提交
191
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
192 193

      nRow++;
H
Hongze Cheng 已提交
194 195 196 197 198
    }
  }

_exit:
  if (code) {
H
Hongze Cheng 已提交
199
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
200
  } else {
H
Hongze Cheng 已提交
201
    tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, pCommitter->fid, nRow);
H
Hongze Cheng 已提交
202 203 204 205
  }
  return code;
}

H
Hongze Cheng 已提交
206
static int32_t commit_delete_data(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
207 208 209
  int32_t code = 0;
  int32_t lino;

H
Hongze Cheng 已提交
210 211
  return 0;

H
Hongze Cheng 已提交
212
  ASSERTS(0, "TODO: Not implemented yet");
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
  int64_t    nDel = 0;
H
Hongze Cheng 已提交
215 216
  SMemTable *pMem = pCommitter->pTsdb->imem;

H
Hongze Cheng 已提交
217 218 219
  if (pMem->nDel == 0) {  // no del data
    goto _exit;
  }
H
Hongze Cheng 已提交
220 221 222 223 224

  for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);

    for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
225 226 227 228 229
      if (pDelData->eKey < pCommitter->minKey) continue;
      if (pDelData->sKey > pCommitter->maxKey) {
        pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey);
        continue;
      }
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231 232
      code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
                                    pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
H
Hongze Cheng 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
              pMem->nDel);
  }
  return code;
}

H
Hongze Cheng 已提交
247 248 249 250
static int32_t commit_fset_start(SCommitter *pCommitter) {
  STsdb  *pTsdb = pCommitter->pTsdb;
  int32_t vid = TD_VID(pTsdb->pVnode);

H
Hongze Cheng 已提交
251 252 253
  pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
254
  pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
255
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
256

H
Hongze Cheng 已提交
257
  tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->fset);
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260
  tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid,
            pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel);
H
Hongze Cheng 已提交
261
  return 0;
H
Hongze Cheng 已提交
262 263
}

H
Hongze Cheng 已提交
264
static int32_t commit_fset_end(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
265
  int32_t code = 0;
H
Hongze Cheng 已提交
266 267
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
268

H
Hongze Cheng 已提交
269 270
  if (pCommitter->pWriter == NULL) return 0;

H
Hongze Cheng 已提交
271 272 273
  STFileOp op;
  code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275 276
  code = TARRAY2_APPEND(&pCommitter->opArray, op);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
277 278 279

_exit:
  if (code) {
H
Hongze Cheng 已提交
280
    tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
H
Hongze Cheng 已提交
281
  } else {
H
Hongze Cheng 已提交
282
    tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->fid);
H
Hongze Cheng 已提交
283 284 285 286
  }
  return code;
}

H
Hongze Cheng 已提交
287
static int32_t commit_fset(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
288 289
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
290
  int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
H
Hongze Cheng 已提交
291

H
Hongze Cheng 已提交
292
  // fset commit start
H
Hongze Cheng 已提交
293
  code = commit_fset_start(pCommitter);
H
Hongze Cheng 已提交
294
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296
  // commit fset
H
Hongze Cheng 已提交
297
  code = commit_timeseries_data(pCommitter);
H
Hongze Cheng 已提交
298 299
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
300
  code = commit_delete_data(pCommitter);
H
Hongze Cheng 已提交
301 302 303
  TSDB_CHECK_CODE(code, lino, _exit);

  // fset commit end
H
Hongze Cheng 已提交
304
  code = commit_fset_end(pCommitter);
H
Hongze Cheng 已提交
305
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
306 307 308

_exit:
  if (code) {
H
Hongze Cheng 已提交
309 310 311
    tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
312 313 314 315
  }
  return code;
}

H
Hongze Cheng 已提交
316
static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
317
  int32_t code = 0;
H
Hongze Cheng 已提交
318
  int32_t lino;
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  // set config
H
Hongze Cheng 已提交
321 322
  memset(pCommitter, 0, sizeof(SCommitter));
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
323 324 325 326 327
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
H
Hongze Cheng 已提交
328
  pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
H
Hongze Cheng 已提交
329 330

  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
H
Hongze Cheng 已提交
331
  if (pCommitter->aTbDataP == NULL) {
H
Hongze Cheng 已提交
332 333
    taosArrayDestroy(pCommitter->aTbDataP);
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
H
Hongze Cheng 已提交
334
  }
H
Hongze Cheng 已提交
335
  TARRAY2_INIT(&pCommitter->opArray);
H
Hongze Cheng 已提交
336
  tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid);
H
Hongze Cheng 已提交
337

H
Hongze Cheng 已提交
338 339
  // start loop
  pCommitter->nextKey = pTsdb->imem->minKey;  // TODO
H
Hongze Cheng 已提交
340 341 342

_exit:
  if (code) {
H
Hongze Cheng 已提交
343
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
344
  } else {
H
Hongze Cheng 已提交
345
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
346 347 348 349
  }
  return code;
}

H
Hongze Cheng 已提交
350
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
H
Hongze Cheng 已提交
351
  int32_t code = 0;
H
Hongze Cheng 已提交
352 353
  int32_t lino = 0;
  int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
H
Hongze Cheng 已提交
354

H
Hongze Cheng 已提交
355
  if (eno == 0) {
H
Hongze Cheng 已提交
356
    code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, &pCommiter->opArray, TSDB_FEDIT_COMMIT);
H
Hongze Cheng 已提交
357
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
358
  } else {
H
Hongze Cheng 已提交
359 360
    // TODO
    ASSERT(0);
H
Hongze Cheng 已提交
361
  }
H
Hongze Cheng 已提交
362

H
Hongze Cheng 已提交
363 364
  ASSERT(pCommiter->pWriter == NULL);
  taosArrayDestroy(pCommiter->aTbDataP);
H
Hongze Cheng 已提交
365
  TARRAY2_CLEAR_FREE(&pCommiter->opArray, NULL);
H
Hongze Cheng 已提交
366

H
Hongze Cheng 已提交
367
_exit:
H
Hongze Cheng 已提交
368
  if (code) {
H
Hongze Cheng 已提交
369 370
    tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code),
              pCommiter->eid);
H
Hongze Cheng 已提交
371
  } else {
H
Hongze Cheng 已提交
372
    tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid);
H
Hongze Cheng 已提交
373
  }
H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
  return code;
}

int32_t tsdbPreCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
  return 0;
}

int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
  if (!pTsdb) return 0;

  int32_t    code = 0;
  int32_t    lino = 0;
H
Hongze Cheng 已提交
391
  SMemTable *pMem = pTsdb->imem;
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393
  if (pMem->nRow == 0 && pMem->nDel == 0) {
H
Hongze Cheng 已提交
394 395 396
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
397 398 399
    tsdbUnrefMemTable(pMem, NULL, true);
  } else {
    SCommitter committer;
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
    code = open_committer(pTsdb, pInfo, &committer);
H
Hongze Cheng 已提交
402
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
403

H
Hongze Cheng 已提交
404
    while (committer.nextKey != TSKEY_MAX) {
H
Hongze Cheng 已提交
405 406 407 408 409
      code = commit_fset(&committer);
      if (code) {
        lino = __LINE__;
        break;
      }
H
Hongze Cheng 已提交
410
    }
H
Hongze Cheng 已提交
411

H
Hongze Cheng 已提交
412
    code = close_committer(&committer, code);
H
Hongze Cheng 已提交
413 414
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
415 416 417

_exit:
  if (code) {
H
Hongze Cheng 已提交
418
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
419
  } else {
H
Hongze Cheng 已提交
420
    tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow,
H
Hongze Cheng 已提交
421
             pMem->nDel);
H
Hongze Cheng 已提交
422 423 424 425
  }
  return code;
}

H
Hongze Cheng 已提交
426
int32_t tsdbCommitCommit(STsdb *pTsdb) {
H
Hongze Cheng 已提交
427 428 429
  int32_t code = 0;
  int32_t lino = 0;
  int32_t vid = TD_VID(pTsdb->pVnode);
H
Hongze Cheng 已提交
430

H
Hongze Cheng 已提交
431
  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
432

H
Hongze Cheng 已提交
433 434
  SMemTable *pMemTable = pTsdb->imem;
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
435
  code = tsdbFSEditCommit(pTsdb->pFS);
H
Hongze Cheng 已提交
436 437 438 439 440 441
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  pTsdb->imem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
442
  tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
443

H
Hongze Cheng 已提交
444 445 446
  // TODO: make this call async
  code = tsdbMerge(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
447

H
Hongze Cheng 已提交
448 449
_exit:
  if (code) {
H
Hongze Cheng 已提交
450
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
451
  } else {
H
Hongze Cheng 已提交
452
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
453 454 455 456 457 458 459
  }
  return code;
}

int32_t tsdbCommitAbort(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
460 461 462
  int32_t vid = TD_VID(pTsdb->pVnode);

  if (pTsdb->imem == NULL) goto _exit;
H
Hongze Cheng 已提交
463

H
Hongze Cheng 已提交
464
  code = tsdbFSEditAbort(pTsdb->pFS);
H
Hongze Cheng 已提交
465 466 467 468
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
H
Hongze Cheng 已提交
469
    tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
470
  } else {
H
Hongze Cheng 已提交
471
    tsdbInfo("vgId:%d %s done", vid, __func__);
H
Hongze Cheng 已提交
472 473 474
  }
  return code;
}