tsdbCommit.c 27.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21
  int64_t commitID;
H
Hongze Cheng 已提交
22 23
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
24 25
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
26
  // --------------
H
Hongze Cheng 已提交
27
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
28 29 30
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
31
  // commit file data
H
Hongze Cheng 已提交
32
  SDataFReader *pReader;
H
Hongze Cheng 已提交
33 34 35 36
  SMapData      oBlockIdxMap;  // SMapData<SBlockIdx>, read from reader
  SMapData      oBlockMap;     // SMapData<SBlock>, read from reader
  SBlock        oBlock;
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
37
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
38 39 40 41
  SMapData      nBlockIdxMap;  // SMapData<SBlockIdx>, build by committer
  SMapData      nBlockMap;     // SMapData<SBlock>
  SBlock        nBlock;
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
42
  /* commit del */
H
Hongze Cheng 已提交
43
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
44 45
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
46
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
47 48
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
49
} SCommitter;
H
refact  
Hongze Cheng 已提交
50

H
Hongze Cheng 已提交
51 52 53 54 55
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
56

H
refact  
Hongze Cheng 已提交
57
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
58
  int32_t code = 0;
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
61
  if (code) goto _err;
H
Hongze Cheng 已提交
62

H
Hongze Cheng 已提交
63 64 65
  return code;

_err:
H
Hongze Cheng 已提交
66
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
67
  return code;
H
Hongze Cheng 已提交
68 69
}

H
more  
Hongze Cheng 已提交
70
int32_t tsdbCommit(STsdb *pTsdb) {
71
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
72

H
more  
Hongze Cheng 已提交
73
  int32_t    code = 0;
H
Hongze Cheng 已提交
74 75 76 77
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
78 79
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
80 81 82 83
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
84

H
more  
Hongze Cheng 已提交
85
  // start commit
H
more  
Hongze Cheng 已提交
86
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
87
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
88

H
refact  
Hongze Cheng 已提交
89 90
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
91
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
92 93

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
94
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
95 96

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
97
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
98 99

  // end commit
H
more  
Hongze Cheng 已提交
100
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
101
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
102

H
Hongze Cheng 已提交
103
_exit:
H
refact  
Hongze Cheng 已提交
104 105 106
  return code;

_err:
H
Hongze Cheng 已提交
107
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
108
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
109 110 111
  return code;
}

H
Hongze Cheng 已提交
112
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
113 114 115 116 117 118
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
119 120 121
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
122
  // load old
H
Hongze Cheng 已提交
123
  if (pDelFileR) {
H
Hongze Cheng 已提交
124
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
125
    if (code) goto _err;
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127 128
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
129 130
  }

H
Hongze Cheng 已提交
131
  // prepare new
H
Hongze Cheng 已提交
132
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
133
  if (code) goto _err;
H
Hongze Cheng 已提交
134 135 136 137 138 139 140

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
141 142 143
  return code;
}

H
Hongze Cheng 已提交
144
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
145 146 147 148 149
  int32_t   code = 0;
  SDelData *pDelData;
  tb_uid_t  suid;
  tb_uid_t  uid;
  SDelIdx   delIdx;  // TODO
H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
H
Hongze Cheng 已提交
168
  delIdx.maxVersion = INT64_MIN;
H
Hongze Cheng 已提交
169 170 171 172 173 174 175 176 177 178 179 180

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
H
Hongze Cheng 已提交
181
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData);
H
Hongze Cheng 已提交
182 183
    if (code) goto _err;

H
Hongze Cheng 已提交
184
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
185 186
    if (code) goto _err;

H
Hongze Cheng 已提交
187 188 189 190
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
191 192 193
  }

  // memory
H
Hongze Cheng 已提交
194 195 196
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
197 198
    if (code) goto _err;

H
Hongze Cheng 已提交
199 200 201 202
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
223
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
237

H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
252
      if (c == 0) {
H
Hongze Cheng 已提交
253
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
254
      } else if (c < 0) {
H
Hongze Cheng 已提交
255
        goto _commit_mem_del;
H
Hongze Cheng 已提交
256
      } else {
H
Hongze Cheng 已提交
257
        goto _commit_disk_del;
H
Hongze Cheng 已提交
258
      }
H
Hongze Cheng 已提交
259
    } else {
H
Hongze Cheng 已提交
260 261
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
262 263
    }

H
Hongze Cheng 已提交
264 265 266 267 268 269 270
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
271 272
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
273
    continue;
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275 276 277 278 279 280 281 282 283 284 285 286
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
287

H
Hongze Cheng 已提交
288 289
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
290
    if (code) goto _err;
H
Hongze Cheng 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
306 307
  }

H
Hongze Cheng 已提交
308
  return code;
H
Hongze Cheng 已提交
309 310 311 312

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
313 314 315 316
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
317

H
Hongze Cheng 已提交
318 319
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
320

H
Hongze Cheng 已提交
321
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
322
  if (code) goto _err;
H
Hongze Cheng 已提交
323 324

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
325
  if (code) goto _err;
H
Hongze Cheng 已提交
326 327 328 329 330 331

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
332 333 334
  return code;

_err:
H
Hongze Cheng 已提交
335
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
336 337 338
  return code;
}

H
Hongze Cheng 已提交
339 340
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))

H
Hongze Cheng 已提交
341 342
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
                                    bool toDataOnly) {
H
Hongze Cheng 已提交
343 344 345 346 347 348 349
  int32_t   code = 0;
  TSDBROW  *pRow;
  STSchema *pTSchema = NULL;  // TODO
  TSDBKEY   key;
  SBlock   *pBlock = &pCommitter->nBlock;

  if (pIter == NULL) goto _exit;
H
Hongze Cheng 已提交
350

H
Hongze Cheng 已提交
351 352
  tBlockReset(pBlock);
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
353 354 355 356
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
H
Hongze Cheng 已提交
357
      if (pCommitter->nBlockData.nRow == 0) {
H
Hongze Cheng 已提交
358 359 360
        break;
      } else {
        goto _write_block_data;
H
Hongze Cheng 已提交
361
      }
H
Hongze Cheng 已提交
362
    }
H
Hongze Cheng 已提交
363

H
Hongze Cheng 已提交
364 365 366 367 368 369 370 371
    // update schema
    if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) {
      // TODO
      // pTSchema = NULL;
    }

    // append row
    code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema);
H
Hongze Cheng 已提交
372
    if (code) goto _err;
H
Hongze Cheng 已提交
373

H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381 382 383 384
    // update info
    key = tsdbRowKey(pRow);
    if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key;
    if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key;
    if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version;
    if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version;

    // iter next
    tsdbTbDataIterNext(pIter);

    // check write
H
Hongze Cheng 已提交
385
    if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
H
Hongze Cheng 已提交
386 387 388 389
      continue;
    }

  _write_block_data:
H
Hongze Cheng 已提交
390
    if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
H
Hongze Cheng 已提交
391
      pCommitter->nBlock.last = 1;
H
Hongze Cheng 已提交
392
    } else {
H
Hongze Cheng 已提交
393
      pCommitter->nBlock.last = 0;
H
Hongze Cheng 已提交
394 395
    }

H
Hongze Cheng 已提交
396
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock);
H
Hongze Cheng 已提交
397 398
    if (code) goto _err;

H
Hongze Cheng 已提交
399
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
H
Hongze Cheng 已提交
400 401
    if (code) goto _err;

H
Hongze Cheng 已提交
402 403 404 405 406 407 408
    // update info
    if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = pBlockIdx->info.minKey;
    if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey = pBlockIdx->info.maxKey;
    if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = pBlock->info.minVerion;
    if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = pBlock->info.maxVersion;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
409
    tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
410 411
  }

H
Hongze Cheng 已提交
412
_exit:
H
Hongze Cheng 已提交
413 414 415 416 417 418 419
  return code;

_err:
  tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
420 421 422 423 424 425 426 427 428 429
static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
430

H
Hongze Cheng 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
    if (pRow == NULL) break;
    key = tsdbRowKey(pRow);

    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                                   int8_t toDataOnly) {
  int32_t  code = 0;
  int32_t  iRow = 0;
  int32_t  nRow = 0;
  int32_t  c;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
454
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
455 456 457
  TSDBKEY  key1;
  TSDBKEY  key2;

H
Hongze Cheng 已提交
458
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
459 460

  // load last and merge until {pCommitter->maxKey, INT64_MAX}
H
Hongze Cheng 已提交
461
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL);
H
Hongze Cheng 已提交
462 463 464
  if (code) goto _err;

  iRow = 0;
H
Hongze Cheng 已提交
465
  nRow = pCommitter->oBlockData.nRow;
H
Hongze Cheng 已提交
466 467 468 469
  pRow = tsdbTbDataIterGet(pIter);

  while (true) {
    if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
H
Hongze Cheng 已提交
470
      if (pCommitter->nBlockData.nRow > 0) {
H
Hongze Cheng 已提交
471
        goto _write_block_data;
H
Hongze Cheng 已提交
472
      } else {
H
Hongze Cheng 已提交
473
        break;
H
Hongze Cheng 已提交
474
      }
H
Hongze Cheng 已提交
475
    }
H
Hongze Cheng 已提交
476 477 478 479

    // TODO

  _write_block_data:
H
Hongze Cheng 已提交
480 481
    block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0;
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
482 483
    if (code) goto _err;

H
Hongze Cheng 已提交
484
    code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
H
Hongze Cheng 已提交
485
    if (code) goto _err;
H
Hongze Cheng 已提交
486
  }
H
Hongze Cheng 已提交
487

H
Hongze Cheng 已提交
488
  tBlockReset(&block);
H
Hongze Cheng 已提交
489
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
490 491 492 493 494

  return code;

_err:
  tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
495 496
  return code;
}
H
Hongze Cheng 已提交
497

H
Hongze Cheng 已提交
498 499
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                               int8_t isLastBlock) {
H
Hongze Cheng 已提交
500 501 502 503
  int32_t  code = 0;
  TSDBROW *pRow;
  TSDBKEY  key;
  int32_t  c;
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505
  if (pBlock == NULL) {  // (pIter && pBlock == NULL)
H
Hongze Cheng 已提交
506 507 508 509
    key.ts = pCommitter->maxKey;
    key.version = INT64_MAX;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
    if (code) goto _err;
H
Hongze Cheng 已提交
510
  } else if (pBlock->last) {
H
Hongze Cheng 已提交
511
    // merge
H
Hongze Cheng 已提交
512
    code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
H
Hongze Cheng 已提交
513
    if (code) goto _err;
H
Hongze Cheng 已提交
514
  } else {  // pBlock && pBlock->last == 0 && (pIter == NULL || pIter)
H
Hongze Cheng 已提交
515
    // memory
H
Hongze Cheng 已提交
516 517 518 519 520 521
    if (pIter) {
      key.ts = pBlock->info.minKey.ts;
      key.version = pBlock->info.minKey.version - 1;
      code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
      if (code) goto _err;
    }
H
Hongze Cheng 已提交
522 523 524 525 526

    // merge or move block
    pRow = tsdbTbDataIterGet(pIter);
    key.ts = pRow->pTSRow->ts;
    key.version = pRow->version;
H
Hongze Cheng 已提交
527

H
Hongze Cheng 已提交
528 529 530
    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c > 0) {
      // move block
H
Hongze Cheng 已提交
531
      code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
H
Hongze Cheng 已提交
532 533
      if (code) goto _err;
    } else if (c == 0) {
H
Hongze Cheng 已提交
534 535 536 537 538 539 540 541
      int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);

      if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) {
        code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1);
        if (code) goto _err;
      } else {
        // add as a subblock
      }
H
Hongze Cheng 已提交
542 543 544
    } else {
      ASSERT(0);
    }
H
Hongze Cheng 已提交
545 546 547 548 549 550
  }

  return code;

_err:
  tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
551
  return code;
H
Hongze Cheng 已提交
552 553
}

H
Hongze Cheng 已提交
554 555 556 557 558
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
  STbDataIter  iter;
  STbDataIter *pIter = &iter;
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
559 560 561
  int64_t      suid;
  int64_t      uid;
  SBlockIdx    blockIdx;
H
Hongze Cheng 已提交
562 563 564

  // create iter
  if (pTbData) {
H
Hongze Cheng 已提交
565 566
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
567 568
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
  } else {
H
Hongze Cheng 已提交
569 570
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
571 572 573 574 575 576
    pIter = NULL;
  }

  // check
  pRow = tsdbTbDataIterGet(pIter);
  if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
577 578

  // start ================================
H
Hongze Cheng 已提交
579
  tMapDataReset(&pCommitter->oBlockMap);
H
Hongze Cheng 已提交
580 581
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
582
  if (pBlockIdx) {
H
Hongze Cheng 已提交
583
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
H
Hongze Cheng 已提交
584 585
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587 588 589 590 591
  blockIdx = tBlockIdxInit(suid, uid);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
  tBlockDataReset(&pCommitter->nBlockData);

H
Hongze Cheng 已提交
592
  // impl ===============================
H
Hongze Cheng 已提交
593
  int32_t iBlock = 0;
H
Hongze Cheng 已提交
594
  int32_t nBlock = pCommitter->oBlockMap.nItem;
H
Hongze Cheng 已提交
595

H
Hongze Cheng 已提交
596 597
  // merge
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
598 599 600
  while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1));
H
Hongze Cheng 已提交
601 602 603 604
    if (code) goto _err;

    pRow = tsdbTbDataIterGet(pIter);
    iBlock++;
H
Hongze Cheng 已提交
605
  }
H
Hongze Cheng 已提交
606

H
Hongze Cheng 已提交
607 608 609
  // mem
  pRow = tsdbTbDataIterGet(pIter);
  while (!ROW_END(pRow, pCommitter->maxKey)) {
H
Hongze Cheng 已提交
610
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0);
H
Hongze Cheng 已提交
611
    if (code) goto _err;
H
Hongze Cheng 已提交
612

H
Hongze Cheng 已提交
613 614
    pRow = tsdbTbDataIterGet(pIter);
  }
H
Hongze Cheng 已提交
615

H
Hongze Cheng 已提交
616
  // disk
H
Hongze Cheng 已提交
617 618
  while (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock);
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620
    code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0);
H
Hongze Cheng 已提交
621 622 623
    if (code) goto _err;

    iBlock++;
H
Hongze Cheng 已提交
624 625
  }

H
Hongze Cheng 已提交
626
  // end ===============================
H
Hongze Cheng 已提交
627
  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx);
H
Hongze Cheng 已提交
628 629
  if (code) goto _err;

H
Hongze Cheng 已提交
630
  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx);
H
Hongze Cheng 已提交
631
  if (code) goto _err;
H
Hongze Cheng 已提交
632

H
Hongze Cheng 已提交
633
_exit:
H
Hongze Cheng 已提交
634 635 636 637 638 639 640 641
  pRow = tsdbTbDataIterGet(pIter);
  if (pRow) {
    ASSERT(pRow->pTSRow->ts > pCommitter->maxKey);
    if (pCommitter->nextKey > pRow->pTSRow->ts) {
      pCommitter->nextKey = pRow->pTSRow->ts;
    }
  }

H
Hongze Cheng 已提交
642 643 644
  return code;

_err:
H
Hongze Cheng 已提交
645
  tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
646 647 648 649 650 651
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
652 653
  SDFileSet *pRSet = NULL;
  SDFileSet *pWSet = NULL;
H
Hongze Cheng 已提交
654

H
Hongze Cheng 已提交
655
  // memory
H
Hongze Cheng 已提交
656
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658 659 660 661 662
  // old
  tMapDataReset(&pCommitter->oBlockIdxMap);
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
663
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
H
Hongze Cheng 已提交
664
  if (pRSet) {
H
Hongze Cheng 已提交
665
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
666 667
    if (code) goto _err;

H
Hongze Cheng 已提交
668
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
H
Hongze Cheng 已提交
669
    if (code) goto _err;
H
Hongze Cheng 已提交
670 671
  }

H
Hongze Cheng 已提交
672 673 674 675 676
  // new
  tMapDataReset(&pCommitter->nBlockIdxMap);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
  if (pRSet) {
    pWSet = &(SDFileSet){.diskId = pRSet->diskId,
                         .fid = pCommitter->commitFid,
                         .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                         .fData = pRSet->fData,
                         .fLast = {.commitID = pCommitter->commitID, .size = 0},
                         .fSma = pRSet->fSma};
  } else {
    SDiskID did = {.level = 0, .id = 0};  // TODO: alloc a new one
    pWSet = &(SDFileSet){.diskId = did,
                         .fid = pCommitter->commitFid,
                         .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                         .fData = {.commitID = pCommitter->commitID, .size = 0},
                         .fLast = {.commitID = pCommitter->commitID, .size = 0},
                         .fSma = {.commitID = pCommitter->commitID, .size = 0}};
  }
H
Hongze Cheng 已提交
693
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
H
Hongze Cheng 已提交
694 695 696 697 698 699
  if (code) goto _err;

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
700
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
701 702 703 704 705
  return code;
}

static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
706
  int32_t    c;
H
Hongze Cheng 已提交
707 708 709 710
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
711
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
712
  int32_t    nBlockIdx = pCommitter->oBlockIdxMap.nItem;
H
Hongze Cheng 已提交
713
  STbData   *pTbData;
H
Hongze Cheng 已提交
714
  SBlockIdx  blockIdx;
H
Hongze Cheng 已提交
715
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
716

H
Hongze Cheng 已提交
717
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
718

H
Hongze Cheng 已提交
719 720
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
721
    tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
722 723
  } else {
    pBlockIdx = NULL;
H
Hongze Cheng 已提交
724 725 726 727
  }

  while (true) {
    if (pTbData == NULL && pBlockIdx == NULL) break;
H
Hongze Cheng 已提交
728 729

    if (pTbData && pBlockIdx) {
H
Hongze Cheng 已提交
730 731
      c = tTABLEIDCmprFn(pTbData, pBlockIdx);

H
Hongze Cheng 已提交
732
      if (c == 0) {
H
Hongze Cheng 已提交
733
        goto _commit_mem_and_disk_data;
H
Hongze Cheng 已提交
734
      } else if (c < 0) {
H
Hongze Cheng 已提交
735
        goto _commit_mem_data;
H
Hongze Cheng 已提交
736
      } else {
H
Hongze Cheng 已提交
737
        goto _commit_disk_data;
H
Hongze Cheng 已提交
738
      }
H
Hongze Cheng 已提交
739 740
    } else if (pTbData) {
      goto _commit_mem_data;
H
Hongze Cheng 已提交
741
    } else {
H
Hongze Cheng 已提交
742
      goto _commit_disk_data;
H
Hongze Cheng 已提交
743 744
    }

H
Hongze Cheng 已提交
745 746 747
  _commit_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
748

H
Hongze Cheng 已提交
749 750 751 752
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
753
      pTbData = NULL;
H
Hongze Cheng 已提交
754
    }
H
Hongze Cheng 已提交
755
    continue;
H
Hongze Cheng 已提交
756

H
Hongze Cheng 已提交
757 758 759
  _commit_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
760

H
Hongze Cheng 已提交
761 762
    iBlockIdx++;
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
763
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
764 765 766 767
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
768

H
Hongze Cheng 已提交
769 770
  _commit_mem_and_disk_data:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
H
Hongze Cheng 已提交
771
    if (code) goto _err;
H
Hongze Cheng 已提交
772

H
Hongze Cheng 已提交
773 774 775 776 777 778 779 780
    iTbData++;
    iBlockIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
781
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
782 783 784 785
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
786 787 788 789 790
  }

  return code;

_err:
H
Hongze Cheng 已提交
791
  tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
792 793 794 795 796
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
797

H
Hongze Cheng 已提交
798
  // write blockIdx
H
Hongze Cheng 已提交
799
  code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
H
Hongze Cheng 已提交
800 801
  if (code) goto _err;

H
Hongze Cheng 已提交
802
  // update file header
H
Hongze Cheng 已提交
803 804 805
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
  if (code) goto _err;

H
Hongze Cheng 已提交
806 807
  // close and sync
  code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
H
Hongze Cheng 已提交
808 809 810
  if (code) goto _err;

  if (pCommitter->pReader) {
H
Hongze Cheng 已提交
811
    code = tsdbDataFReaderClose(pCommitter->pReader);
H
Hongze Cheng 已提交
812 813 814 815 816 817 818
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
819
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
820 821 822
  return code;
}

H
Hongze Cheng 已提交
823
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
824 825
  int32_t code = 0;

H
Hongze Cheng 已提交
826 827
  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
828
  if (code) goto _err;
H
Hongze Cheng 已提交
829

H
Hongze Cheng 已提交
830 831
  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
832
  if (code) goto _err;
H
Hongze Cheng 已提交
833

H
Hongze Cheng 已提交
834 835
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
836
  if (code) goto _err;
H
Hongze Cheng 已提交
837 838 839 840

  return code;

_err:
H
Hongze Cheng 已提交
841
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
842 843 844
  return code;
}

H
Hongze Cheng 已提交
845 846
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
847
  int32_t code = 0;
H
Hongze Cheng 已提交
848

H
Hongze Cheng 已提交
849 850
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
851

H
Hongze Cheng 已提交
852 853 854 855
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
856

H
Hongze Cheng 已提交
857
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
858
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
859 860 861 862
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
863

H
Hongze Cheng 已提交
864 865 866 867 868 869 870
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
871 872 873
  return code;
}

H
Hongze Cheng 已提交
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pReader = NULL;
  pCommitter->oBlockIdxMap = tMapDataInit();
  pCommitter->oBlockMap = tMapDataInit();
  pCommitter->oBlock = tBlockInit();
  pCommitter->pWriter = NULL;
  pCommitter->nBlockIdxMap = tMapDataInit();
  pCommitter->nBlockMap = tMapDataInit();
  pCommitter->nBlock = tBlockInit();
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
  code = tBlockDataInit(&pCommitter->nBlockData);
  if (code) {
    tBlockDataClear(&pCommitter->oBlockData);
    goto _exit;
  }

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
  tMapDataClear(&pCommitter->oBlockIdxMap);
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockClear(&pCommitter->oBlock);
  tBlockDataClear(&pCommitter->oBlockData);
  tMapDataClear(&pCommitter->nBlockIdxMap);
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockClear(&pCommitter->nBlock);
  tBlockDataClear(&pCommitter->nBlockData);
}

H
Hongze Cheng 已提交
908 909 910 911
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
912

H
Hongze Cheng 已提交
913
  // check
H
Hongze Cheng 已提交
914
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
915

H
Hongze Cheng 已提交
916 917
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
918
  if (code) goto _err;
H
Hongze Cheng 已提交
919 920 921

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
922 923 924 925 926
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
927
    if (code) goto _err;
H
Hongze Cheng 已提交
928
  }
H
Hongze Cheng 已提交
929

H
Hongze Cheng 已提交
930 931 932
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
933 934 935
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
936

H
Hongze Cheng 已提交
937
_err:
H
Hongze Cheng 已提交
938
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
939 940 941
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
942

H
Hongze Cheng 已提交
943 944 945 946
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
947

H
Hongze Cheng 已提交
948 949
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
950
  }
H
Hongze Cheng 已提交
951

H
Hongze Cheng 已提交
952 953 954 955 956
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
957

H
Hongze Cheng 已提交
958 959 960 961 962
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
963

H
Hongze Cheng 已提交
964 965 966 967 968
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
969

H
Hongze Cheng 已提交
970
_exit:
H
Hongze Cheng 已提交
971
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
972 973 974
  return code;

_err:
H
Hongze Cheng 已提交
975
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
976
  return code;
H
Hongze Cheng 已提交
977 978 979 980 981 982 983
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
984 985 986 987 988 989

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
  int32_t code = 0;
  // TODO
  return code;
}