tsdbCommit.c 22.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21 22
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
23 24
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
25
  // --------------
H
Hongze Cheng 已提交
26
  TSKEY   nextKey;  // need to be reset by each table commit
H
Hongze Cheng 已提交
27 28 29
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
30
  // commit file data
H
Hongze Cheng 已提交
31 32
  SDataFReader *pReader;
  SMapData      oBlockIdx;  // SMapData<SBlockIdx>, read from reader
H
Hongze Cheng 已提交
33
  SMapData      oBlock;     // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
34
  SBlockData    bDataO;
H
Hongze Cheng 已提交
35 36
  SDataFWriter *pWriter;
  SMapData      nBlockIdx;  // SMapData<SBlockIdx>, build by committer
H
Hongze Cheng 已提交
37
  SMapData      nBlock;     // SMapData<SBlock>
H
Hongze Cheng 已提交
38
  SBlockData    bDataN;
H
Hongze Cheng 已提交
39
  /* commit del */
H
Hongze Cheng 已提交
40
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
41 42
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
43
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
44 45
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
46
} SCommitter;
H
refact  
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48 49 50 51 52
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
53

H
refact  
Hongze Cheng 已提交
54
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
55
  int32_t code = 0;
H
Hongze Cheng 已提交
56

H
Hongze Cheng 已提交
57 58 59
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
  if (code) {
    goto _err;
H
Hongze Cheng 已提交
60 61
  }

H
Hongze Cheng 已提交
62 63 64 65
  return code;

_err:
  return code;
H
Hongze Cheng 已提交
66 67
}

H
more  
Hongze Cheng 已提交
68
int32_t tsdbCommit(STsdb *pTsdb) {
69
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
70

H
more  
Hongze Cheng 已提交
71
  int32_t    code = 0;
H
Hongze Cheng 已提交
72 73 74 75 76 77 78 79 80
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {  // TODO
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
81

H
more  
Hongze Cheng 已提交
82
  // start commit
H
more  
Hongze Cheng 已提交
83 84 85
  code = tsdbStartCommit(pTsdb, &commith);
  if (code) {
    goto _err;
H
refact  
Hongze Cheng 已提交
86 87
  }

H
refact  
Hongze Cheng 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
  // commit impl
  code = tsdbCommitData(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitDel(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitCache(&commith);
  if (code) {
    goto _err;
  }

  // end commit
H
more  
Hongze Cheng 已提交
105 106 107 108
  code = tsdbEndCommit(&commith, 0);
  if (code) {
    goto _err;
  }
H
refact  
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
_exit:
H
refact  
Hongze Cheng 已提交
111 112 113
  return code;

_err:
H
Hongze Cheng 已提交
114
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
115
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
116 117 118
  return code;
}

H
Hongze Cheng 已提交
119
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
120 121 122 123 124 125
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
126 127 128
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
129
  // load old
H
Hongze Cheng 已提交
130
  if (pDelFileR) {
H
Hongze Cheng 已提交
131
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
132
    if (code) goto _err;
H
Hongze Cheng 已提交
133

H
Hongze Cheng 已提交
134 135
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
136 137
  }

H
Hongze Cheng 已提交
138
  // prepare new
H
Hongze Cheng 已提交
139
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
140
  if (code) goto _err;
H
Hongze Cheng 已提交
141 142 143 144 145 146 147

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
148 149 150
  return code;
}

H
Hongze Cheng 已提交
151
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
152 153 154 155 156
  int32_t   code = 0;
  SDelData *pDelData;
  tb_uid_t  suid;
  tb_uid_t  uid;
  SDelIdx   delIdx;  // TODO
H
Hongze Cheng 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
H
Hongze Cheng 已提交
175
  delIdx.maxVersion = INT64_MIN;
H
Hongze Cheng 已提交
176 177 178 179 180 181 182 183 184 185 186 187

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
H
Hongze Cheng 已提交
188
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData);
H
Hongze Cheng 已提交
189 190
    if (code) goto _err;

H
Hongze Cheng 已提交
191
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
192 193
    if (code) goto _err;

H
Hongze Cheng 已提交
194 195 196 197
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
198 199 200
  }

  // memory
H
Hongze Cheng 已提交
201 202 203
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
204 205
    if (code) goto _err;

H
Hongze Cheng 已提交
206 207 208 209
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
230
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257 258
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
259
      if (c == 0) {
H
Hongze Cheng 已提交
260
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
261
      } else if (c < 0) {
H
Hongze Cheng 已提交
262
        goto _commit_mem_del;
H
Hongze Cheng 已提交
263
      } else {
H
Hongze Cheng 已提交
264
        goto _commit_disk_del;
H
Hongze Cheng 已提交
265
      }
H
Hongze Cheng 已提交
266
    } else {
H
Hongze Cheng 已提交
267 268
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
269 270
    }

H
Hongze Cheng 已提交
271 272 273 274 275 276 277
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
278 279
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
280
    continue;
H
Hongze Cheng 已提交
281

H
Hongze Cheng 已提交
282 283 284 285 286 287 288 289 290 291 292 293
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295 296
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
297
    if (code) goto _err;
H
Hongze Cheng 已提交
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
313 314
  }

H
Hongze Cheng 已提交
315
  return code;
H
Hongze Cheng 已提交
316 317 318 319

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
320 321 322 323
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
324

H
Hongze Cheng 已提交
325 326
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
327

H
Hongze Cheng 已提交
328
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
329
  if (code) goto _err;
H
Hongze Cheng 已提交
330 331

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
332
  if (code) goto _err;
H
Hongze Cheng 已提交
333 334 335 336 337 338

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
339 340 341
  return code;

_err:
H
Hongze Cheng 已提交
342
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
343 344 345
  return code;
}

H
Hongze Cheng 已提交
346 347
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))

H
Hongze Cheng 已提交
348 349 350 351
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
                                    bool toDataOnly) {
  int32_t  code = 0;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
352
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
353

H
Hongze Cheng 已提交
354 355 356 357 358 359 360 361
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
      if (pCommitter->bDataN.nRow == 0) {
        break;
      } else {
        goto _write_block_data;
H
Hongze Cheng 已提交
362
      }
H
Hongze Cheng 已提交
363
    }
H
Hongze Cheng 已提交
364

H
Hongze Cheng 已提交
365 366
    code = tsdbBlockDataAppendRow(&pCommitter->bDataN, pRow, NULL /*TODO*/);
    if (code) goto _err;
H
Hongze Cheng 已提交
367

H
Hongze Cheng 已提交
368 369 370 371 372 373 374 375 376 377 378
    if (pCommitter->bDataN.nRow < pCommitter->maxRow * 4 / 5) {
      continue;
    }

  _write_block_data:
    if (!toDataOnly && pCommitter->bDataN.nRow < pCommitter->minKey) {
      block.last = 1;
    } else {
      block.last = 0;
    }

H
Hongze Cheng 已提交
379
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
380 381 382 383 384
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
    if (code) goto _err;

H
Hongze Cheng 已提交
385
    tBlockReset(&block);
H
Hongze Cheng 已提交
386
    tsdbBlockDataClear(&pCommitter->bDataN);
H
Hongze Cheng 已提交
387 388 389 390 391 392 393 394 395
  }

  return code;

_err:
  tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
396 397 398 399 400 401 402 403 404 405
static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
406

H
Hongze Cheng 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
    if (pRow == NULL) break;
    key = tsdbRowKey(pRow);

    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                                   int8_t toDataOnly) {
  int32_t  code = 0;
  int32_t  iRow = 0;
  int32_t  nRow = 0;
  int32_t  c;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
430
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
  TSDBKEY  key1;
  TSDBKEY  key2;

  tsdbBlockDataClear(&pCommitter->bDataN);

  // load last and merge until {pCommitter->maxKey, INT64_MAX}
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->bDataO, NULL, 0, NULL, NULL);
  if (code) goto _err;

  iRow = 0;
  nRow = pCommitter->bDataO.nRow;
  pRow = tsdbTbDataIterGet(pIter);

  while (true) {
    if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
      if (pCommitter->bDataN.nRow > 0) {
        goto _write_block_data;
H
Hongze Cheng 已提交
448
      } else {
H
Hongze Cheng 已提交
449
        break;
H
Hongze Cheng 已提交
450
      }
H
Hongze Cheng 已提交
451
    }
H
Hongze Cheng 已提交
452 453 454 455 456

    // TODO

  _write_block_data:
    block.last = pCommitter->bDataN.nRow < pCommitter->minRow ? 1 : 0;
H
Hongze Cheng 已提交
457
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->bDataN, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
458 459 460 461
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlock, &block, tPutBlock);
    if (code) goto _err;
H
Hongze Cheng 已提交
462
  }
H
Hongze Cheng 已提交
463

H
Hongze Cheng 已提交
464
  tBlockReset(&block);
H
Hongze Cheng 已提交
465 466 467 468 469 470
  tsdbBlockDataClear(&pCommitter->bDataN);

  return code;

_err:
  tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
471 472
  return code;
}
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475 476
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
  int32_t    code = 0;
  TSDBROW   *pRow;
H
Hongze Cheng 已提交
477
  SBlock     block = tBlockInit();
H
Hongze Cheng 已提交
478 479 480
  SBlockData bDataN;
  TSDBKEY    key;
  int32_t    c;
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482 483 484 485 486
  if (pBlock == NULL) {
    key.ts = pCommitter->maxKey;
    key.version = INT64_MAX;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
    if (code) goto _err;
H
Hongze Cheng 已提交
487
  } else if (pBlock->last) {
H
Hongze Cheng 已提交
488
    // merge
H
Hongze Cheng 已提交
489
    code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
H
Hongze Cheng 已提交
490
    if (code) goto _err;
H
Hongze Cheng 已提交
491
  } else {
H
Hongze Cheng 已提交
492 493 494 495 496 497 498 499 500 501
    // memory
    key.ts = pBlock->info.minKey.ts;
    key.version = pBlock->info.minKey.version - 1;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
    if (code) goto _err;

    // merge or move block
    pRow = tsdbTbDataIterGet(pIter);
    key.ts = pRow->pTSRow->ts;
    key.version = pRow->version;
H
Hongze Cheng 已提交
502

H
Hongze Cheng 已提交
503 504 505 506 507 508
    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c > 0) {
      // move block
      code = tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock);
      if (code) goto _err;
    } else if (c == 0) {
H
Hongze Cheng 已提交
509 510 511 512 513 514 515 516
      int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);

      if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) {
        code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1);
        if (code) goto _err;
      } else {
        // add as a subblock
      }
H
Hongze Cheng 已提交
517 518 519
    } else {
      ASSERT(0);
    }
H
Hongze Cheng 已提交
520 521 522 523 524 525
  }

  return code;

_err:
  tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
526
  return code;
H
Hongze Cheng 已提交
527 528
}

H
Hongze Cheng 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
  STbDataIter  iter;
  STbDataIter *pIter = &iter;
  TSDBROW     *pRow;
  SBlockIdx    blockIdx;  // TODO

  // create iter
  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
  } else {
    pIter = NULL;
  }

  // check
  pRow = tsdbTbDataIterGet(pIter);
  if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
546 547 548 549 550 551 552 553

  // start ================================
  tMapDataReset(&pCommitter->oBlock);
  tMapDataReset(&pCommitter->nBlock);
  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL);
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555
  // impl ===============================
H
Hongze Cheng 已提交
556 557 558 559
  SBlock  block;
  SBlock *pBlock = &block;
  int32_t iBlock = 0;
  int32_t nBlock = pCommitter->oBlock.nItem;
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561 562 563 564
  // merge
  pRow = tsdbTbDataIterGet(pIter);
  while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
565
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock);
H
Hongze Cheng 已提交
566 567 568 569
    if (code) goto _err;

    pRow = tsdbTbDataIterGet(pIter);
    iBlock++;
H
Hongze Cheng 已提交
570
  }
H
Hongze Cheng 已提交
571

H
Hongze Cheng 已提交
572 573 574
  // mem
  pRow = tsdbTbDataIterGet(pIter);
  while (!ROW_END(pRow, pCommitter->maxKey)) {
H
Hongze Cheng 已提交
575
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL);
H
Hongze Cheng 已提交
576
    if (code) goto _err;
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579
    pRow = tsdbTbDataIterGet(pIter);
  }
H
Hongze Cheng 已提交
580

H
Hongze Cheng 已提交
581 582 583 584
  // disk
  while (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock);

H
Hongze Cheng 已提交
585
    code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock);
H
Hongze Cheng 已提交
586 587 588
    if (code) goto _err;

    iBlock++;
H
Hongze Cheng 已提交
589 590
  }

H
Hongze Cheng 已提交
591 592 593 594 595 596
  // end ===============================
  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx);
  if (code) goto _err;
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598
_exit:
H
Hongze Cheng 已提交
599 600 601 602 603 604 605 606
  pRow = tsdbTbDataIterGet(pIter);
  if (pRow) {
    ASSERT(pRow->pTSRow->ts > pCommitter->maxKey);
    if (pCommitter->nextKey > pRow->pTSRow->ts) {
      pCommitter->nextKey = pRow->pTSRow->ts;
    }
  }

H
Hongze Cheng 已提交
607 608 609
  return code;

_err:
H
Hongze Cheng 已提交
610
  tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
611 612 613 614 615 616
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
617 618
  SDFileSet *pRSet = NULL;  // TODO
  SDFileSet *pWSet = NULL;  // TODO
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620
  // memory
H
Hongze Cheng 已提交
621
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
622
  tMapDataReset(&pCommitter->oBlockIdx);
H
Hongze Cheng 已提交
623
  tMapDataReset(&pCommitter->oBlock);
H
Hongze Cheng 已提交
624
  tMapDataReset(&pCommitter->nBlockIdx);
H
Hongze Cheng 已提交
625
  tMapDataReset(&pCommitter->nBlock);
H
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627
  // load old
H
Hongze Cheng 已提交
628
  if (pRSet) {
H
Hongze Cheng 已提交
629
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
630 631
    if (code) goto _err;

H
Hongze Cheng 已提交
632
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx, NULL);
H
Hongze Cheng 已提交
633
    if (code) goto _err;
H
Hongze Cheng 已提交
634 635
  }

H
Hongze Cheng 已提交
636
  // create new
H
Hongze Cheng 已提交
637
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
H
Hongze Cheng 已提交
638 639 640 641 642 643
  if (code) goto _err;

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
644
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
645 646 647 648 649
  return code;
}

static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
650
  int32_t    c;
H
Hongze Cheng 已提交
651 652 653 654
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
655 656
  int32_t    iBlockIdx = 0;
  int32_t    nBlockIdx = pCommitter->oBlockIdx.nItem;
H
Hongze Cheng 已提交
657
  STbData   *pTbData;
H
Hongze Cheng 已提交
658
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
659
  SBlockIdx  blockIdx;
H
Hongze Cheng 已提交
660

H
Hongze Cheng 已提交
661
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
662

H
Hongze Cheng 已提交
663 664
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
665
    pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
666
    code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
667
    if (code) goto _err;
H
Hongze Cheng 已提交
668 669
  } else {
    pBlockIdx = NULL;
H
Hongze Cheng 已提交
670 671 672 673
  }

  while (true) {
    if (pTbData == NULL && pBlockIdx == NULL) break;
H
Hongze Cheng 已提交
674 675

    if (pTbData && pBlockIdx) {
H
Hongze Cheng 已提交
676 677
      c = tTABLEIDCmprFn(pTbData, pBlockIdx);

H
Hongze Cheng 已提交
678
      if (c == 0) {
H
Hongze Cheng 已提交
679
        goto _commit_mem_and_disk_data;
H
Hongze Cheng 已提交
680
      } else if (c < 0) {
H
Hongze Cheng 已提交
681
        goto _commit_mem_data;
H
Hongze Cheng 已提交
682
      } else {
H
Hongze Cheng 已提交
683
        goto _commit_disk_data;
H
Hongze Cheng 已提交
684
      }
H
Hongze Cheng 已提交
685 686
    } else if (pTbData) {
      goto _commit_mem_data;
H
Hongze Cheng 已提交
687
    } else {
H
Hongze Cheng 已提交
688
      goto _commit_disk_data;
H
Hongze Cheng 已提交
689 690
    }

H
Hongze Cheng 已提交
691 692 693
  _commit_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
694

H
Hongze Cheng 已提交
695 696 697 698
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
699
      pTbData = NULL;
H
Hongze Cheng 已提交
700
    }
H
Hongze Cheng 已提交
701
    continue;
H
Hongze Cheng 已提交
702

H
Hongze Cheng 已提交
703 704 705
  _commit_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
706

H
Hongze Cheng 已提交
707 708
    iBlockIdx++;
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
709
      pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
710
      code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
711 712 713 714 715
      if (code) goto _err;
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
716

H
Hongze Cheng 已提交
717 718
  _commit_mem_and_disk_data:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
H
Hongze Cheng 已提交
719
    if (code) goto _err;
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721 722 723 724 725 726 727 728
    iTbData++;
    iBlockIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
729
      pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
730
      code = tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
731 732 733 734 735
      if (code) goto _err;
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
736 737 738 739 740
  }

  return code;

_err:
H
Hongze Cheng 已提交
741
  tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
742 743 744 745 746
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
747

H
Hongze Cheng 已提交
748 749
  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdx, NULL);
H
Hongze Cheng 已提交
750 751
  if (code) goto _err;

H
Hongze Cheng 已提交
752
  // update file header
H
Hongze Cheng 已提交
753 754 755
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
  if (code) goto _err;

H
Hongze Cheng 已提交
756 757
  // close and sync
  code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
H
Hongze Cheng 已提交
758 759 760
  if (code) goto _err;

  if (pCommitter->pReader) {
H
Hongze Cheng 已提交
761
    code = tsdbDataFReaderClose(pCommitter->pReader);
H
Hongze Cheng 已提交
762 763 764 765 766 767 768
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
769
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
770 771 772
  return code;
}

H
Hongze Cheng 已提交
773
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
774 775
  int32_t code = 0;

H
Hongze Cheng 已提交
776 777
  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
778 779 780 781
  if (code) {
    goto _err;
  }

H
Hongze Cheng 已提交
782 783
  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
784 785 786 787
  if (code) {
    goto _err;
  }

H
Hongze Cheng 已提交
788 789
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
790 791 792 793 794 795 796 797 798 799
  if (code) {
    goto _err;
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
800 801
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
802
  int32_t code = 0;
H
Hongze Cheng 已提交
803

H
Hongze Cheng 已提交
804 805 806 807 808 809
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
810

H
Hongze Cheng 已提交
811
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
812 813 814 815

  return code;
}

H
Hongze Cheng 已提交
816 817 818 819
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
820

H
Hongze Cheng 已提交
821
  // check
H
Hongze Cheng 已提交
822
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
823

H
Hongze Cheng 已提交
824
  // loop
H
Hongze Cheng 已提交
825
  pCommitter->nextKey = pMemTable->info.minKey.ts;
H
Hongze Cheng 已提交
826 827 828 829 830
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
831
    if (code) goto _err;
H
Hongze Cheng 已提交
832
  }
H
Hongze Cheng 已提交
833

H
Hongze Cheng 已提交
834 835 836
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
837

H
Hongze Cheng 已提交
838 839 840 841
_err:
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
842

H
Hongze Cheng 已提交
843 844 845 846
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
847

H
Hongze Cheng 已提交
848 849
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
850
  }
H
Hongze Cheng 已提交
851

H
Hongze Cheng 已提交
852 853 854 855 856
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858 859 860 861 862
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
863

H
Hongze Cheng 已提交
864 865 866 867 868
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
869

H
Hongze Cheng 已提交
870
_exit:
H
Hongze Cheng 已提交
871
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
872 873 874
  return code;

_err:
H
Hongze Cheng 已提交
875
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
876
  return code;
H
Hongze Cheng 已提交
877 878 879 880 881 882 883
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
884 885 886 887 888 889

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
  int32_t code = 0;
  // TODO
  return code;
}