tsdbCommit.c 26.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21 22
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
23 24
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
25
  // --------------
H
Hongze Cheng 已提交
26
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
27 28 29
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
30
  // commit file data
H
Hongze Cheng 已提交
31
  SDataFReader *pReader;
H
Hongze Cheng 已提交
32 33 34 35
  SMapData      oBlockIdxMap;  // SMapData<SBlockIdx>, read from reader
  SMapData      oBlockMap;     // SMapData<SBlock>, read from reader
  SBlock        oBlock;
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
36
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
37 38 39 40
  SMapData      nBlockIdxMap;  // SMapData<SBlockIdx>, build by committer
  SMapData      nBlockMap;     // SMapData<SBlock>
  SBlock        nBlock;
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
41
  /* commit del */
H
Hongze Cheng 已提交
42
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
43 44
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
45
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
46 47
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
48
} SCommitter;
H
refact  
Hongze Cheng 已提交
49

H
Hongze Cheng 已提交
50 51 52 53 54
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
55

H
refact  
Hongze Cheng 已提交
56
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
57
  int32_t code = 0;
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
60
  if (code) goto _err;
H
Hongze Cheng 已提交
61

H
Hongze Cheng 已提交
62 63 64
  return code;

_err:
H
Hongze Cheng 已提交
65
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
66
  return code;
H
Hongze Cheng 已提交
67 68
}

H
more  
Hongze Cheng 已提交
69
int32_t tsdbCommit(STsdb *pTsdb) {
70
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
71

H
more  
Hongze Cheng 已提交
72
  int32_t    code = 0;
H
Hongze Cheng 已提交
73 74 75 76
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
77 78
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
79 80 81 82
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
83

H
more  
Hongze Cheng 已提交
84
  // start commit
H
more  
Hongze Cheng 已提交
85
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
86
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
87

H
refact  
Hongze Cheng 已提交
88 89
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
90
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
91 92

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
93
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
94 95

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
96
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
97 98

  // end commit
H
more  
Hongze Cheng 已提交
99
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
100
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102
_exit:
H
refact  
Hongze Cheng 已提交
103 104 105
  return code;

_err:
H
Hongze Cheng 已提交
106
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
107
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
108 109 110
  return code;
}

H
Hongze Cheng 已提交
111
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
112 113 114 115 116 117
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
118 119 120
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
121
  // load old
H
Hongze Cheng 已提交
122
  if (pDelFileR) {
H
Hongze Cheng 已提交
123
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
124
    if (code) goto _err;
H
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126 127
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
128 129
  }

H
Hongze Cheng 已提交
130
  // prepare new
H
Hongze Cheng 已提交
131
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
132
  if (code) goto _err;
H
Hongze Cheng 已提交
133 134 135 136 137 138 139

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
140 141 142
  return code;
}

H
Hongze Cheng 已提交
143
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
144 145 146 147 148
  int32_t   code = 0;
  SDelData *pDelData;
  tb_uid_t  suid;
  tb_uid_t  uid;
  SDelIdx   delIdx;  // TODO
H
Hongze Cheng 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
H
Hongze Cheng 已提交
167
  delIdx.maxVersion = INT64_MIN;
H
Hongze Cheng 已提交
168 169 170 171 172 173 174 175 176 177 178 179

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
H
Hongze Cheng 已提交
180
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData);
H
Hongze Cheng 已提交
181 182
    if (code) goto _err;

H
Hongze Cheng 已提交
183
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
184 185
    if (code) goto _err;

H
Hongze Cheng 已提交
186 187 188 189
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
190 191 192
  }

  // memory
H
Hongze Cheng 已提交
193 194 195
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
196 197
    if (code) goto _err;

H
Hongze Cheng 已提交
198 199 200 201
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
222
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
251
      if (c == 0) {
H
Hongze Cheng 已提交
252
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
253
      } else if (c < 0) {
H
Hongze Cheng 已提交
254
        goto _commit_mem_del;
H
Hongze Cheng 已提交
255
      } else {
H
Hongze Cheng 已提交
256
        goto _commit_disk_del;
H
Hongze Cheng 已提交
257
      }
H
Hongze Cheng 已提交
258
    } else {
H
Hongze Cheng 已提交
259 260
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
261 262
    }

H
Hongze Cheng 已提交
263 264 265 266 267 268 269
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
270 271
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
272
    continue;
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274 275 276 277 278 279 280 281 282 283 284 285
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
286

H
Hongze Cheng 已提交
287 288
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
289
    if (code) goto _err;
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
305 306
  }

H
Hongze Cheng 已提交
307
  return code;
H
Hongze Cheng 已提交
308 309 310 311

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
312 313 314 315
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317 318
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
321
  if (code) goto _err;
H
Hongze Cheng 已提交
322 323

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
324
  if (code) goto _err;
H
Hongze Cheng 已提交
325 326 327 328 329 330

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
331 332 333
  return code;

_err:
H
Hongze Cheng 已提交
334
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
335 336 337
  return code;
}

H
Hongze Cheng 已提交
338 339
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))

H
Hongze Cheng 已提交
340 341
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
                                    bool toDataOnly) {
H
Hongze Cheng 已提交
342 343 344 345 346 347 348
  int32_t   code = 0;
  TSDBROW  *pRow;
  STSchema *pTSchema = NULL;  // TODO
  TSDBKEY   key;
  SBlock   *pBlock = &pCommitter->nBlock;

  if (pIter == NULL) goto _exit;
H
Hongze Cheng 已提交
349

H
Hongze Cheng 已提交
350 351
  tBlockReset(pBlock);
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
352 353 354 355
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
H
Hongze Cheng 已提交
356
      if (pCommitter->nBlockData.nRow == 0) {
H
Hongze Cheng 已提交
357 358 359
        break;
      } else {
        goto _write_block_data;
H
Hongze Cheng 已提交
360
      }
H
Hongze Cheng 已提交
361
    }
H
Hongze Cheng 已提交
362

H
Hongze Cheng 已提交
363 364 365 366 367 368 369 370
    // update schema
    if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) {
      // TODO
      // pTSchema = NULL;
    }

    // append row
    code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema);
H
Hongze Cheng 已提交
371
    if (code) goto _err;
H
Hongze Cheng 已提交
372

H
Hongze Cheng 已提交
373 374 375 376 377 378 379 380 381 382 383
    // update info
    key = tsdbRowKey(pRow);
    if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key;
    if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key;
    if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version;
    if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version;

    // iter next
    tsdbTbDataIterNext(pIter);

    // check write
H
Hongze Cheng 已提交
384
    if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
H
Hongze Cheng 已提交
385 386 387 388
      continue;
    }

  _write_block_data:
H
Hongze Cheng 已提交
389
    if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
H
Hongze Cheng 已提交
390
      pCommitter->nBlock.last = 1;
H
Hongze Cheng 已提交
391
    } else {
H
Hongze Cheng 已提交
392
      pCommitter->nBlock.last = 0;
H
Hongze Cheng 已提交
393 394
    }

H
Hongze Cheng 已提交
395
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock);
H
Hongze Cheng 已提交
396 397
    if (code) goto _err;

H
Hongze Cheng 已提交
398
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
H
Hongze Cheng 已提交
399 400
    if (code) goto _err;

H
Hongze Cheng 已提交
401 402 403 404 405 406 407
    // update info
    if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = pBlockIdx->info.minKey;
    if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey = pBlockIdx->info.maxKey;
    if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = pBlock->info.minVerion;
    if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = pBlock->info.maxVersion;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
408
    tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
409 410
  }

H
Hongze Cheng 已提交
411
_exit:
H
Hongze Cheng 已提交
412 413 414 415 416 417 418
  return code;

_err:
  tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
419 420 421 422 423 424 425 426 427 428
static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
429

H
Hongze Cheng 已提交
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
    if (pRow == NULL) break;
    key = tsdbRowKey(pRow);

    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                                   int8_t toDataOnly) {
  int32_t  code = 0;
  int32_t  iRow = 0;
  int32_t  nRow = 0;
  int32_t  c;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
453
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
454 455 456
  TSDBKEY  key1;
  TSDBKEY  key2;

H
Hongze Cheng 已提交
457
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
458 459

  // load last and merge until {pCommitter->maxKey, INT64_MAX}
H
Hongze Cheng 已提交
460
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL);
H
Hongze Cheng 已提交
461 462 463
  if (code) goto _err;

  iRow = 0;
H
Hongze Cheng 已提交
464
  nRow = pCommitter->oBlockData.nRow;
H
Hongze Cheng 已提交
465 466 467 468
  pRow = tsdbTbDataIterGet(pIter);

  while (true) {
    if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
H
Hongze Cheng 已提交
469
      if (pCommitter->nBlockData.nRow > 0) {
H
Hongze Cheng 已提交
470
        goto _write_block_data;
H
Hongze Cheng 已提交
471
      } else {
H
Hongze Cheng 已提交
472
        break;
H
Hongze Cheng 已提交
473
      }
H
Hongze Cheng 已提交
474
    }
H
Hongze Cheng 已提交
475 476 477 478

    // TODO

  _write_block_data:
H
Hongze Cheng 已提交
479 480
    block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0;
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
481 482
    if (code) goto _err;

H
Hongze Cheng 已提交
483
    code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
H
Hongze Cheng 已提交
484
    if (code) goto _err;
H
Hongze Cheng 已提交
485
  }
H
Hongze Cheng 已提交
486

H
Hongze Cheng 已提交
487
  tBlockReset(&block);
H
Hongze Cheng 已提交
488
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
489 490 491 492 493

  return code;

_err:
  tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
494 495
  return code;
}
H
Hongze Cheng 已提交
496

H
Hongze Cheng 已提交
497 498
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                               int8_t isLastBlock) {
H
Hongze Cheng 已提交
499 500 501 502
  int32_t  code = 0;
  TSDBROW *pRow;
  TSDBKEY  key;
  int32_t  c;
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504
  if (pBlock == NULL) {  // (pIter && pBlock == NULL)
H
Hongze Cheng 已提交
505 506 507 508
    key.ts = pCommitter->maxKey;
    key.version = INT64_MAX;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
    if (code) goto _err;
H
Hongze Cheng 已提交
509
  } else if (pBlock->last) {
H
Hongze Cheng 已提交
510
    // merge
H
Hongze Cheng 已提交
511
    code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
H
Hongze Cheng 已提交
512
    if (code) goto _err;
H
Hongze Cheng 已提交
513
  } else {  // pBlock && pBlock->last == 0 && (pIter == NULL || pIter)
H
Hongze Cheng 已提交
514
    // memory
H
Hongze Cheng 已提交
515 516 517 518 519 520
    if (pIter) {
      key.ts = pBlock->info.minKey.ts;
      key.version = pBlock->info.minKey.version - 1;
      code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
521 522 523 524 525

    // merge or move block
    pRow = tsdbTbDataIterGet(pIter);
    key.ts = pRow->pTSRow->ts;
    key.version = pRow->version;
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527 528 529
    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c > 0) {
      // move block
H
Hongze Cheng 已提交
530
      code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
H
Hongze Cheng 已提交
531 532
      if (code) goto _err;
    } else if (c == 0) {
H
Hongze Cheng 已提交
533 534 535 536 537 538 539 540
      int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);

      if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) {
        code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1);
        if (code) goto _err;
      } else {
        // add as a subblock
      }
H
Hongze Cheng 已提交
541 542 543
    } else {
      ASSERT(0);
    }
H
Hongze Cheng 已提交
544 545 546 547 548 549
  }

  return code;

_err:
  tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
550
  return code;
H
Hongze Cheng 已提交
551 552
}

H
Hongze Cheng 已提交
553 554 555 556 557
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
  STbDataIter  iter;
  STbDataIter *pIter = &iter;
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
558 559 560
  int64_t      suid;
  int64_t      uid;
  SBlockIdx    blockIdx;
H
Hongze Cheng 已提交
561 562 563

  // create iter
  if (pTbData) {
H
Hongze Cheng 已提交
564 565
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
566 567
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
  } else {
H
Hongze Cheng 已提交
568 569
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
570 571 572 573 574 575
    pIter = NULL;
  }

  // check
  pRow = tsdbTbDataIterGet(pIter);
  if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
576 577

  // start ================================
H
Hongze Cheng 已提交
578
  tMapDataReset(&pCommitter->oBlockMap);
H
Hongze Cheng 已提交
579 580
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
581
  if (pBlockIdx) {
H
Hongze Cheng 已提交
582
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
H
Hongze Cheng 已提交
583 584
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
585

H
Hongze Cheng 已提交
586 587 588 589 590
  blockIdx = tBlockIdxInit(suid, uid);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
  tBlockDataReset(&pCommitter->nBlockData);

H
Hongze Cheng 已提交
591
  // impl ===============================
H
Hongze Cheng 已提交
592
  int32_t iBlock = 0;
H
Hongze Cheng 已提交
593
  int32_t nBlock = pCommitter->oBlockMap.nItem;
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595 596
  // merge
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
597 598 599
  while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1));
H
Hongze Cheng 已提交
600 601 602 603
    if (code) goto _err;

    pRow = tsdbTbDataIterGet(pIter);
    iBlock++;
H
Hongze Cheng 已提交
604
  }
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606 607 608
  // mem
  pRow = tsdbTbDataIterGet(pIter);
  while (!ROW_END(pRow, pCommitter->maxKey)) {
H
Hongze Cheng 已提交
609
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0);
H
Hongze Cheng 已提交
610
    if (code) goto _err;
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612 613
    pRow = tsdbTbDataIterGet(pIter);
  }
H
Hongze Cheng 已提交
614

H
Hongze Cheng 已提交
615
  // disk
H
Hongze Cheng 已提交
616 617
  while (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
H
Hongze Cheng 已提交
618

H
Hongze Cheng 已提交
619
    code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0);
H
Hongze Cheng 已提交
620 621 622
    if (code) goto _err;

    iBlock++;
H
Hongze Cheng 已提交
623 624
  }

H
Hongze Cheng 已提交
625
  // end ===============================
H
Hongze Cheng 已提交
626
  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx);
H
Hongze Cheng 已提交
627 628
  if (code) goto _err;

H
Hongze Cheng 已提交
629
  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx);
H
Hongze Cheng 已提交
630
  if (code) goto _err;
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632
_exit:
H
Hongze Cheng 已提交
633 634 635 636 637 638 639 640
  pRow = tsdbTbDataIterGet(pIter);
  if (pRow) {
    ASSERT(pRow->pTSRow->ts > pCommitter->maxKey);
    if (pCommitter->nextKey > pRow->pTSRow->ts) {
      pCommitter->nextKey = pRow->pTSRow->ts;
    }
  }

H
Hongze Cheng 已提交
641 642 643
  return code;

_err:
H
Hongze Cheng 已提交
644
  tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
645 646 647 648 649 650
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
651 652
  SDFileSet *pRSet = NULL;  // TODO
  SDFileSet *pWSet = NULL;  // TODO
H
Hongze Cheng 已提交
653

H
Hongze Cheng 已提交
654
  // memory
H
Hongze Cheng 已提交
655
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
656

H
Hongze Cheng 已提交
657 658 659 660 661
  // old
  tMapDataReset(&pCommitter->oBlockIdxMap);
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
662
  if (pRSet) {
H
Hongze Cheng 已提交
663
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
664 665
    if (code) goto _err;

H
Hongze Cheng 已提交
666
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
H
Hongze Cheng 已提交
667
    if (code) goto _err;
H
Hongze Cheng 已提交
668 669
  }

H
Hongze Cheng 已提交
670 671 672 673 674
  // new
  tMapDataReset(&pCommitter->nBlockIdxMap);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
675
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
H
Hongze Cheng 已提交
676 677 678 679 680 681
  if (code) goto _err;

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
682
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
683 684 685 686 687
  return code;
}

static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
688
  int32_t    c;
H
Hongze Cheng 已提交
689 690 691 692
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
693
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
694
  int32_t    nBlockIdx = pCommitter->oBlockIdxMap.nItem;
H
Hongze Cheng 已提交
695
  STbData   *pTbData;
H
Hongze Cheng 已提交
696
  SBlockIdx  blockIdx;
H
Hongze Cheng 已提交
697
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
698

H
Hongze Cheng 已提交
699
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
700

H
Hongze Cheng 已提交
701 702
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
703
    tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
704 705
  } else {
    pBlockIdx = NULL;
H
Hongze Cheng 已提交
706 707 708 709
  }

  while (true) {
    if (pTbData == NULL && pBlockIdx == NULL) break;
H
Hongze Cheng 已提交
710 711

    if (pTbData && pBlockIdx) {
H
Hongze Cheng 已提交
712 713
      c = tTABLEIDCmprFn(pTbData, pBlockIdx);

H
Hongze Cheng 已提交
714
      if (c == 0) {
H
Hongze Cheng 已提交
715
        goto _commit_mem_and_disk_data;
H
Hongze Cheng 已提交
716
      } else if (c < 0) {
H
Hongze Cheng 已提交
717
        goto _commit_mem_data;
H
Hongze Cheng 已提交
718
      } else {
H
Hongze Cheng 已提交
719
        goto _commit_disk_data;
H
Hongze Cheng 已提交
720
      }
H
Hongze Cheng 已提交
721 722
    } else if (pTbData) {
      goto _commit_mem_data;
H
Hongze Cheng 已提交
723
    } else {
H
Hongze Cheng 已提交
724
      goto _commit_disk_data;
H
Hongze Cheng 已提交
725 726
    }

H
Hongze Cheng 已提交
727 728 729
  _commit_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
730

H
Hongze Cheng 已提交
731 732 733 734
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
735
      pTbData = NULL;
H
Hongze Cheng 已提交
736
    }
H
Hongze Cheng 已提交
737
    continue;
H
Hongze Cheng 已提交
738

H
Hongze Cheng 已提交
739 740 741
  _commit_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
742

H
Hongze Cheng 已提交
743 744
    iBlockIdx++;
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
745
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
746 747 748 749
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
750

H
Hongze Cheng 已提交
751 752
  _commit_mem_and_disk_data:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
H
Hongze Cheng 已提交
753
    if (code) goto _err;
H
Hongze Cheng 已提交
754

H
Hongze Cheng 已提交
755 756 757 758 759 760 761 762
    iTbData++;
    iBlockIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
763
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
764 765 766 767
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
768 769 770 771 772
  }

  return code;

_err:
H
Hongze Cheng 已提交
773
  tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
774 775 776 777 778
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
779

H
Hongze Cheng 已提交
780
  // write blockIdx
H
Hongze Cheng 已提交
781
  code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
H
Hongze Cheng 已提交
782 783
  if (code) goto _err;

H
Hongze Cheng 已提交
784
  // update file header
H
Hongze Cheng 已提交
785 786 787
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
  if (code) goto _err;

H
Hongze Cheng 已提交
788 789
  // close and sync
  code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
H
Hongze Cheng 已提交
790 791 792
  if (code) goto _err;

  if (pCommitter->pReader) {
H
Hongze Cheng 已提交
793
    code = tsdbDataFReaderClose(pCommitter->pReader);
H
Hongze Cheng 已提交
794 795 796 797 798 799 800
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
801
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
802 803 804
  return code;
}

H
Hongze Cheng 已提交
805
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
806 807
  int32_t code = 0;

H
Hongze Cheng 已提交
808 809
  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
810
  if (code) goto _err;
H
Hongze Cheng 已提交
811

H
Hongze Cheng 已提交
812 813
  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
814
  if (code) goto _err;
H
Hongze Cheng 已提交
815

H
Hongze Cheng 已提交
816 817
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
818
  if (code) goto _err;
H
Hongze Cheng 已提交
819 820 821 822

  return code;

_err:
H
Hongze Cheng 已提交
823
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
824 825 826
  return code;
}

H
Hongze Cheng 已提交
827 828
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
829
  int32_t code = 0;
H
Hongze Cheng 已提交
830

H
Hongze Cheng 已提交
831 832
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
833

H
Hongze Cheng 已提交
834 835 836 837
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
838

H
Hongze Cheng 已提交
839
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
840 841 842 843
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
844

H
Hongze Cheng 已提交
845 846 847 848 849 850 851
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
852 853 854
  return code;
}

H
Hongze Cheng 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pReader = NULL;
  pCommitter->oBlockIdxMap = tMapDataInit();
  pCommitter->oBlockMap = tMapDataInit();
  pCommitter->oBlock = tBlockInit();
  pCommitter->pWriter = NULL;
  pCommitter->nBlockIdxMap = tMapDataInit();
  pCommitter->nBlockMap = tMapDataInit();
  pCommitter->nBlock = tBlockInit();
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
  code = tBlockDataInit(&pCommitter->nBlockData);
  if (code) {
    tBlockDataClear(&pCommitter->oBlockData);
    goto _exit;
  }

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
  tMapDataClear(&pCommitter->oBlockIdxMap);
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockClear(&pCommitter->oBlock);
  tBlockDataClear(&pCommitter->oBlockData);
  tMapDataClear(&pCommitter->nBlockIdxMap);
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockClear(&pCommitter->nBlock);
  tBlockDataClear(&pCommitter->nBlockData);
}

H
Hongze Cheng 已提交
889 890 891 892
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
893

H
Hongze Cheng 已提交
894
  // check
H
Hongze Cheng 已提交
895
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
896

H
Hongze Cheng 已提交
897 898 899 900 901 902
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
  if (code) return code;

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
903 904 905 906 907
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
908
    if (code) goto _err;
H
Hongze Cheng 已提交
909
  }
H
Hongze Cheng 已提交
910

H
Hongze Cheng 已提交
911 912 913
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
914 915 916
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918
_err:
H
Hongze Cheng 已提交
919
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
920 921 922
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
923

H
Hongze Cheng 已提交
924 925 926 927
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
928

H
Hongze Cheng 已提交
929 930
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
931
  }
H
Hongze Cheng 已提交
932

H
Hongze Cheng 已提交
933 934 935 936 937
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
938

H
Hongze Cheng 已提交
939 940 941 942 943
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
944

H
Hongze Cheng 已提交
945 946 947 948 949
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
950

H
Hongze Cheng 已提交
951
_exit:
H
Hongze Cheng 已提交
952
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
953 954 955
  return code;

_err:
H
Hongze Cheng 已提交
956
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
957
  return code;
H
Hongze Cheng 已提交
958 959 960 961 962 963 964
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
965 966 967 968 969 970

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
  int32_t code = 0;
  // TODO
  return code;
}