tsdbCommit.c 17.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19 20 21
  STsdb   *pTsdb;
  uint8_t *pBuf1;
  uint8_t *pBuf2;
H
Hongze Cheng 已提交
22
  uint8_t *pBuf3;
H
Hongze Cheng 已提交
23 24
  uint8_t *pBuf4;
  uint8_t *pBuf5;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26 27
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
28 29
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
30
  // commit file data
H
Hongze Cheng 已提交
31 32 33 34 35 36 37 38
  TSKEY         nextKey;
  int32_t       commitFid;
  TSKEY         minKey;
  TSKEY         maxKey;
  SDataFReader *pReader;
  SMapData      oBlockIdx;  // SMapData<SBlockIdx>, read from reader
  SDataFWriter *pWriter;
  SMapData      nBlockIdx;  // SMapData<SBlockIdx>, build by committer
H
Hongze Cheng 已提交
39
  // commit table data
H
Hongze Cheng 已提交
40 41
  STbDataIter   iter;
  STbDataIter  *pIter;
H
Hongze Cheng 已提交
42 43 44
  SBlockIdx    *pBlockIdx;
  SMapData      oBlock;
  SMapData      nBlock;
H
Hongze Cheng 已提交
45 46
  SColDataBlock oColDataBlock;
  SColDataBlock nColDataBlock;
H
Hongze Cheng 已提交
47
  /* commit del */
H
Hongze Cheng 已提交
48
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
49 50
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
51
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
52 53
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

H
Hongze Cheng 已提交
65 66 67
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
  if (code) {
    goto _err;
H
Hongze Cheng 已提交
68 69
  }

H
Hongze Cheng 已提交
70 71 72 73
  return code;

_err:
  return code;
H
Hongze Cheng 已提交
74 75
}

H
more  
Hongze Cheng 已提交
76
int32_t tsdbCommit(STsdb *pTsdb) {
77
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
78

H
more  
Hongze Cheng 已提交
79
  int32_t    code = 0;
H
Hongze Cheng 已提交
80 81 82 83 84 85 86 87 88
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {  // TODO
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
89

H
more  
Hongze Cheng 已提交
90
  // start commit
H
more  
Hongze Cheng 已提交
91 92 93
  code = tsdbStartCommit(pTsdb, &commith);
  if (code) {
    goto _err;
H
refact  
Hongze Cheng 已提交
94 95
  }

H
refact  
Hongze Cheng 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
  // commit impl
  code = tsdbCommitData(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitDel(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitCache(&commith);
  if (code) {
    goto _err;
  }

  // end commit
H
more  
Hongze Cheng 已提交
113 114 115 116
  code = tsdbEndCommit(&commith, 0);
  if (code) {
    goto _err;
  }
H
refact  
Hongze Cheng 已提交
117

H
Hongze Cheng 已提交
118
_exit:
H
refact  
Hongze Cheng 已提交
119 120 121
  return code;

_err:
H
Hongze Cheng 已提交
122
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
123
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
124 125 126
  return code;
}

H
Hongze Cheng 已提交
127 128 129
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
130
  memset(pCommitter, 0, sizeof(*pCommitter));
H
Hongze Cheng 已提交
131 132 133 134 135 136 137 138 139 140 141 142
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();

  pCommitter->pTsdb = pTsdb;

  return code;
}

static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
143 144 145 146 147 148
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
149 150 151
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
152
  // load old
H
Hongze Cheng 已提交
153
  if (pDelFileR) {
H
Hongze Cheng 已提交
154
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
155
    if (code) goto _err;
H
Hongze Cheng 已提交
156

H
Hongze Cheng 已提交
157 158
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
159 160
  }

H
Hongze Cheng 已提交
161
  // prepare new
H
Hongze Cheng 已提交
162
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
163
  if (code) goto _err;
H
Hongze Cheng 已提交
164 165 166 167 168 169 170

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
171 172 173
  return code;
}

H
Hongze Cheng 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
  int32_t  code = 0;
  SDelData delData;
  SDelOp  *pDelOp;
  tb_uid_t suid;
  tb_uid_t uid;
  SDelIdx  delIdx;  // TODO

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
  delIdx.maxVersion = -1;

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, &delData, tGetDelData);
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nDelDataMap, &delData, tPutDelData);
    if (code) goto _err;

    if (delIdx.minKey > delData.sKey) delIdx.minKey = delData.sKey;
    if (delIdx.maxKey < delData.eKey) delIdx.maxKey = delData.eKey;
    if (delIdx.minVersion > delData.version) delIdx.minVersion = delData.version;
    if (delIdx.maxVersion < delData.version) delIdx.maxVersion = delData.version;
  }

  // memory
  pDelOp = pTbData ? pTbData->pHead : NULL;
  for (; pDelOp; pDelOp = pDelOp->pNext) {
    delData.version = pDelOp->version;
    delData.sKey = pDelOp->sKey;
    delData.eKey = pDelOp->eKey;

    code = tMapDataPutItem(&pCommitter->nDelDataMap, &delData, tPutDelData);
    if (code) goto _err;

    if (delIdx.minKey > delData.sKey) delIdx.minKey = delData.sKey;
    if (delIdx.maxKey < delData.eKey) delIdx.maxKey = delData.eKey;
    if (delIdx.minVersion > delData.version) delIdx.minVersion = delData.version;
    if (delIdx.maxVersion < delData.version) delIdx.maxVersion = delData.version;
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
258
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
272

H
Hongze Cheng 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
287
      if (c == 0) {
H
Hongze Cheng 已提交
288
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
289
      } else if (c < 0) {
H
Hongze Cheng 已提交
290
        goto _commit_mem_del;
H
Hongze Cheng 已提交
291
      } else {
H
Hongze Cheng 已提交
292
        goto _commit_disk_del;
H
Hongze Cheng 已提交
293
      }
H
Hongze Cheng 已提交
294
    } else {
H
Hongze Cheng 已提交
295 296
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
297 298
    }

H
Hongze Cheng 已提交
299 300 301 302 303 304 305
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
306 307
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
308
    continue;
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310 311 312 313 314 315 316 317 318 319 320 321
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
322

H
Hongze Cheng 已提交
323 324
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
325
    if (code) goto _err;
H
Hongze Cheng 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
341 342
  }

H
Hongze Cheng 已提交
343
  return code;
H
Hongze Cheng 已提交
344 345 346 347

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
348 349 350 351
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
352

H
Hongze Cheng 已提交
353 354
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
355

H
Hongze Cheng 已提交
356
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
357
  if (code) goto _err;
H
Hongze Cheng 已提交
358 359

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
360
  if (code) goto _err;
H
Hongze Cheng 已提交
361 362 363 364 365 366

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
367 368 369
  return code;

_err:
H
Hongze Cheng 已提交
370
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
  return code;
}

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
  int32_t code = 0;
  // TODO
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter);

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
  int32_t code = 0;

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) {
    goto _err;
  }

  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
  if (code) {
    goto _err;
  }

  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
  if (code) {
    goto _err;
  }

  return code;

_err:
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
414 415
  SDFileSet *pRSet = NULL;  // TODO
  SDFileSet *pWSet = NULL;  // TODO
H
Hongze Cheng 已提交
416

H
Hongze Cheng 已提交
417 418 419 420
  // memory
  tMapDataReset(&pCommitter->oBlockIdx);
  tMapDataReset(&pCommitter->nBlockIdx);

H
Hongze Cheng 已提交
421
  // load old
H
Hongze Cheng 已提交
422
  if (pRSet) {
H
Hongze Cheng 已提交
423
    code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
424 425
    if (code) goto _err;

H
Hongze Cheng 已提交
426 427
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
428 429
  }

H
Hongze Cheng 已提交
430 431
  // create new
  code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
H
Hongze Cheng 已提交
432 433 434 435 436 437
  if (code) goto _err;

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
438
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
439 440 441 442 443 444 445 446 447 448 449
  return code;
}

static int32_t tsdbCommitTableData(SCommitter *pCommitter);

static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
450 451
  int32_t    iBlockIdx = 0;
  int32_t    nBlockIdx = pCommitter->oBlockIdx.nItem;
H
Hongze Cheng 已提交
452
  STbData   *pTbData;
H
Hongze Cheng 已提交
453 454 455
  SBlockIdx *pBlockIdx;
  SBlockIdx  blockIdx;
  int32_t    c;
H
Hongze Cheng 已提交
456 457 458 459 460 461 462 463

  while (iTbData < nTbData || iBlockIdx < nBlockIdx) {
    pTbData = NULL;
    pBlockIdx = NULL;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    }
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
464 465
      tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, NULL /* TODO */);
      pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
466 467 468
    }

    if (pTbData && pBlockIdx) {
H
Hongze Cheng 已提交
469 470
      c = tTABLEIDCmprFn(pTbData, pBlockIdx);

H
Hongze Cheng 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484
      if (c == 0) {
        iTbData++;
        iBlockIdx++;
      } else if (c < 0) {
        iTbData++;
        pBlockIdx = NULL;
      } else {
        iBlockIdx++;
        pTbData = NULL;
      }
    } else {
      if (pTbData) {
        iBlockIdx++;
      }
H
Hongze Cheng 已提交
485 486 487 488 489
      if (pBlockIdx) {
        iTbData++;
      }
    }

H
Hongze Cheng 已提交
490 491
    if (pTbData &&
        !tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pCommitter->iter)) {
H
Hongze Cheng 已提交
492
      pTbData = NULL;
H
Hongze Cheng 已提交
493 494
    }

H
Hongze Cheng 已提交
495 496
    if (pTbData == NULL && pBlockIdx == NULL) continue;

H
Hongze Cheng 已提交
497 498
    pCommitter->pTbData = pTbData;
    pCommitter->pBlockIdx = pBlockIdx;
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500
    code = tsdbCommitTableData(pCommitter);
H
Hongze Cheng 已提交
501
    if (code) goto _err;
H
Hongze Cheng 已提交
502 503 504 505 506 507 508 509 510 511
  }

  return code;

_err:
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530

  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->nBlockIdx, NULL);
  if (code) goto _err;

  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
  if (code) goto _err;

  code = tsdbDFileSetWriterClose(pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDFileSetReaderClose(pCommitter->pReader);
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
  return code;
}

static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter);

static int32_t tsdbCommitTableData(SCommitter *pCommitter) {
  int32_t code = 0;

  // start
  code = tsdbCommitTableDataStart(pCommitter);
  if (code) {
    goto _err;
  }

  // impl
  code = tsdbCommitTableDataImpl(pCommitter);
  if (code) {
    goto _err;
  }

  // end
  code = tsdbCommitTableDataEnd(pCommitter);
  if (code) {
    goto _err;
  }

_exit:
  return code;

_err:
  return code;
}

static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
568 569

  // old
H
Hongze Cheng 已提交
570
  tMapDataReset(&pCommitter->oBlock);
H
Hongze Cheng 已提交
571 572 573 574 575 576
  if (pCommitter->pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL);
    if (code) goto _err;
  }

  // new
H
Hongze Cheng 已提交
577
  tMapDataReset(&pCommitter->nBlock);
H
Hongze Cheng 已提交
578 579

_err:
H
Hongze Cheng 已提交
580 581 582 583
  return code;
}

static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
584
  int32_t      code = 0;
H
Hongze Cheng 已提交
585
  STsdb       *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
586 587 588 589 590 591 592
  STbDataIter *pIter = NULL;
  int32_t      iBlock = 0;
  int32_t      nBlock = pCommitter->nBlock.nItem;
  SBlock      *pBlock;
  SBlock       block;
  TSDBROW     *pRow;
  TSDBROW      row;
H
Hongze Cheng 已提交
593 594
  int32_t      iRow = 0;
  STSchema    *pTSchema = NULL;
H
Hongze Cheng 已提交
595 596

  if (pCommitter->pTbData) {
H
Hongze Cheng 已提交
597
    code = tsdbTbDataIterCreate(pCommitter->pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pIter);
H
Hongze Cheng 已提交
598 599 600
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
601 602 603 604 605
  if (iBlock < nBlock) {
    pBlock = &block;
  } else {
    pBlock = NULL;
  }
H
Hongze Cheng 已提交
606

H
Hongze Cheng 已提交
607
  tsdbTbDataIterGet(pIter, pRow);
H
Hongze Cheng 已提交
608

H
Hongze Cheng 已提交
609 610 611 612 613 614 615 616
  // loop to merge memory data and disk data
  for (; pBlock == NULL || (pRow && pRow->pTSRow->ts <= pCommitter->maxKey);) {
    if (pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) {
      // only has block data, then move to new index file
    } else if (0) {
      // only commit memory data
    } else {
      // merge memory and block data
H
Hongze Cheng 已提交
617
    }
H
Hongze Cheng 已提交
618 619
  }

H
Hongze Cheng 已提交
620
  tsdbTbDataIterDestroy(pIter);
H
Hongze Cheng 已提交
621 622 623
  return code;

_err:
H
Hongze Cheng 已提交
624 625
  tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  tsdbTbDataIterDestroy(pIter);
H
Hongze Cheng 已提交
626 627 628 629 630 631 632 633 634
  return code;
}

static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}

H
Hongze Cheng 已提交
635 636 637 638
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
639

H
Hongze Cheng 已提交
640 641 642
  // check
  if (pMemTable->nRow == 0) {
    goto _exit;
H
Hongze Cheng 已提交
643
  }
H
Hongze Cheng 已提交
644

H
Hongze Cheng 已提交
645 646 647 648 649 650 651
  // loop
  pCommitter->nextKey = pMemTable->minKey.ts;
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
652
    if (code) goto _err;
H
Hongze Cheng 已提交
653
  }
H
Hongze Cheng 已提交
654

H
Hongze Cheng 已提交
655 656 657
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
658

H
Hongze Cheng 已提交
659 660 661 662
_err:
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
663

H
Hongze Cheng 已提交
664 665 666 667
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
668

H
Hongze Cheng 已提交
669 670
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
671
  }
H
Hongze Cheng 已提交
672

H
Hongze Cheng 已提交
673 674 675 676 677
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
678

H
Hongze Cheng 已提交
679 680 681 682 683
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
684

H
Hongze Cheng 已提交
685 686 687 688 689
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
690

H
Hongze Cheng 已提交
691
_exit:
H
Hongze Cheng 已提交
692
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
693 694 695
  return code;

_err:
H
Hongze Cheng 已提交
696
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
697
  return code;
H
Hongze Cheng 已提交
698 699 700 701 702 703 704
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}