tsdbCommit.c 23.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21 22
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
23 24
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
25
  // --------------
H
Hongze Cheng 已提交
26
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
27 28 29
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
30
  // commit file data
H
Hongze Cheng 已提交
31
  SDataFReader *pReader;
H
Hongze Cheng 已提交
32 33 34 35
  SMapData      oBlockIdxMap;  // SMapData<SBlockIdx>, read from reader
  SMapData      oBlockMap;     // SMapData<SBlock>, read from reader
  SBlock        oBlock;
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
36
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
37 38 39 40
  SMapData      nBlockIdxMap;  // SMapData<SBlockIdx>, build by committer
  SMapData      nBlockMap;     // SMapData<SBlock>
  SBlock        nBlock;
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
41
  /* commit del */
H
Hongze Cheng 已提交
42
  SDelFReader *pDelFReader;
H
Hongze Cheng 已提交
43 44
  SMapData     oDelIdxMap;   // SMapData<SDelIdx>, old
  SMapData     oDelDataMap;  // SMapData<SDelData>, old
H
Hongze Cheng 已提交
45
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
46 47
  SMapData     nDelIdxMap;   // SMapData<SDelIdx>, new
  SMapData     nDelDataMap;  // SMapData<SDelData>, new
H
Hongze Cheng 已提交
48
} SCommitter;
H
refact  
Hongze Cheng 已提交
49

H
Hongze Cheng 已提交
50 51 52 53 54
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
55

H
refact  
Hongze Cheng 已提交
56
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
57
  int32_t code = 0;
H
Hongze Cheng 已提交
58

H
Hongze Cheng 已提交
59
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
60
  if (code) goto _err;
H
Hongze Cheng 已提交
61

H
Hongze Cheng 已提交
62 63 64
  return code;

_err:
H
Hongze Cheng 已提交
65
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
66
  return code;
H
Hongze Cheng 已提交
67 68
}

H
more  
Hongze Cheng 已提交
69
int32_t tsdbCommit(STsdb *pTsdb) {
70
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
71

H
more  
Hongze Cheng 已提交
72
  int32_t    code = 0;
H
Hongze Cheng 已提交
73 74 75 76
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
77 78
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
79 80 81 82
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
83

H
more  
Hongze Cheng 已提交
84
  // start commit
H
more  
Hongze Cheng 已提交
85
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
86
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
87

H
refact  
Hongze Cheng 已提交
88 89
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
90
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
91 92

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
93
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
94 95

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
96
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
97 98

  // end commit
H
more  
Hongze Cheng 已提交
99
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
100
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
101

H
Hongze Cheng 已提交
102
_exit:
H
refact  
Hongze Cheng 已提交
103 104 105
  return code;

_err:
H
Hongze Cheng 已提交
106
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
107
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
108 109 110
  return code;
}

H
Hongze Cheng 已提交
111
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
112 113 114 115 116 117
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  SDelFile  *pDelFileR = NULL;  // TODO
  SDelFile  *pDelFileW = NULL;  // TODO

H
Hongze Cheng 已提交
118 119 120
  tMapDataReset(&pCommitter->oDelIdxMap);
  tMapDataReset(&pCommitter->nDelIdxMap);

H
Hongze Cheng 已提交
121
  // load old
H
Hongze Cheng 已提交
122
  if (pDelFileR) {
H
Hongze Cheng 已提交
123
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
124
    if (code) goto _err;
H
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126 127
    code = tsdbReadDelIdx(pCommitter->pDelFReader, &pCommitter->oDelIdxMap, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
128 129
  }

H
Hongze Cheng 已提交
130
  // prepare new
H
Hongze Cheng 已提交
131
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, pDelFileW, pTsdb);
H
Hongze Cheng 已提交
132
  if (code) goto _err;
H
Hongze Cheng 已提交
133 134 135 136 137 138 139

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
140 141 142
  return code;
}

H
Hongze Cheng 已提交
143
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
144 145 146 147 148
  int32_t   code = 0;
  SDelData *pDelData;
  tb_uid_t  suid;
  tb_uid_t  uid;
  SDelIdx   delIdx;  // TODO
H
Hongze Cheng 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

  // check no del data, just return
  if (pTbData && pTbData->pHead == NULL) {
    pTbData = NULL;
  }
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;

  // prepare
  if (pTbData) {
    delIdx.suid = pTbData->suid;
    delIdx.uid = pTbData->uid;
  } else {
    delIdx.suid = pDelIdx->suid;
    delIdx.uid = pDelIdx->uid;
  }
  delIdx.minKey = TSKEY_MAX;
  delIdx.maxKey = TSKEY_MIN;
  delIdx.minVersion = INT64_MAX;
H
Hongze Cheng 已提交
167
  delIdx.maxVersion = INT64_MIN;
H
Hongze Cheng 已提交
168 169 170 171 172 173 174 175 176 177 178 179

  // start
  tMapDataReset(&pCommitter->oDelDataMap);
  tMapDataReset(&pCommitter->nDelDataMap);

  if (pDelIdx) {
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, &pCommitter->oDelDataMap, NULL);
    if (code) goto _err;
  }

  // disk
  for (int32_t iDelData = 0; iDelData < pCommitter->oDelDataMap.nItem; iDelData++) {
H
Hongze Cheng 已提交
180
    code = tMapDataGetItemByIdx(&pCommitter->oDelDataMap, iDelData, pDelData, tGetDelData);
H
Hongze Cheng 已提交
181 182
    if (code) goto _err;

H
Hongze Cheng 已提交
183
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
184 185
    if (code) goto _err;

H
Hongze Cheng 已提交
186 187 188 189
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
190 191 192
  }

  // memory
H
Hongze Cheng 已提交
193 194 195
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
    code = tMapDataPutItem(&pCommitter->nDelDataMap, pDelData, tPutDelData);
H
Hongze Cheng 已提交
196 197
    if (code) goto _err;

H
Hongze Cheng 已提交
198 199 200 201
    if (delIdx.minKey > pDelData->sKey) delIdx.minKey = pDelData->sKey;
    if (delIdx.maxKey < pDelData->eKey) delIdx.maxKey = pDelData->eKey;
    if (delIdx.minVersion > pDelData->version) delIdx.minVersion = pDelData->version;
    if (delIdx.maxVersion < pDelData->version) delIdx.maxVersion = pDelData->version;
H
Hongze Cheng 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
  }

  ASSERT(pCommitter->nDelDataMap.nItem > 0);

  // write
  code = tsdbWriteDelData(pCommitter->pDelFWriter, &pCommitter->nDelDataMap, NULL, &delIdx);
  if (code) goto _err;

  // put delIdx
  code = tMapDataPutItem(&pCommitter->nDelIdxMap, &delIdx, tPutDelIdx);
  if (code) goto _err;

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
222
static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iDelIdx = 0;
  int32_t    nDelIdx = pCommitter->oDelIdxMap.nItem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData   *pTbData;
  SDelIdx   *pDelIdx;
  SDelIdx    delIdx;
  int32_t    c;

  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
236

H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iDelIdx < nDelIdx) {
    code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
    if (code) goto _err;
    pDelIdx = &delIdx;
  } else {
    pDelIdx = NULL;
  }

  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      c = tTABLEIDCmprFn(pTbData, pDelIdx);
H
Hongze Cheng 已提交
251
      if (c == 0) {
H
Hongze Cheng 已提交
252
        goto _commit_mem_and_disk_del;
H
Hongze Cheng 已提交
253
      } else if (c < 0) {
H
Hongze Cheng 已提交
254
        goto _commit_mem_del;
H
Hongze Cheng 已提交
255
      } else {
H
Hongze Cheng 已提交
256
        goto _commit_disk_del;
H
Hongze Cheng 已提交
257
      }
H
Hongze Cheng 已提交
258
    } else {
H
Hongze Cheng 已提交
259 260
      if (pTbData) goto _commit_mem_del;
      if (pDelIdx) goto _commit_disk_del;
H
Hongze Cheng 已提交
261 262
    }

H
Hongze Cheng 已提交
263 264 265 266 267 268 269
  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
270 271
      pTbData = NULL;
    }
H
Hongze Cheng 已提交
272
    continue;
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274 275 276 277 278 279 280 281 282 283 284 285
  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;
    iDelIdx++;
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
286

H
Hongze Cheng 已提交
287 288
  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
289
    if (code) goto _err;
H
Hongze Cheng 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
    iTbData++;
    iDelIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iDelIdx < nDelIdx) {
      code = tMapDataGetItemByIdx(&pCommitter->oDelIdxMap, iDelIdx, &delIdx, tGetDelIdx);
      if (code) goto _err;
      pDelIdx = &delIdx;
    } else {
      pDelIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
305 306
  }

H
Hongze Cheng 已提交
307
  return code;
H
Hongze Cheng 已提交
308 309 310 311

_err:
  tsdbError("vgId:%d commit del impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
H
Hongze Cheng 已提交
312 313 314 315
}

static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317 318
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, &pCommitter->nDelIdxMap, NULL);
  if (code) goto _err;
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter, NULL);
H
Hongze Cheng 已提交
321
  if (code) goto _err;
H
Hongze Cheng 已提交
322 323

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
324
  if (code) goto _err;
H
Hongze Cheng 已提交
325 326 327 328 329 330

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
331 332 333
  return code;

_err:
H
Hongze Cheng 已提交
334
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
335 336 337
  return code;
}

H
Hongze Cheng 已提交
338 339
#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey)))

H
Hongze Cheng 已提交
340 341 342 343
static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey,
                                    bool toDataOnly) {
  int32_t  code = 0;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
344
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
345

H
Hongze Cheng 已提交
346 347 348 349
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) {
H
Hongze Cheng 已提交
350
      if (pCommitter->nBlockData.nRow == 0) {
H
Hongze Cheng 已提交
351 352 353
        break;
      } else {
        goto _write_block_data;
H
Hongze Cheng 已提交
354
      }
H
Hongze Cheng 已提交
355
    }
H
Hongze Cheng 已提交
356

H
Hongze Cheng 已提交
357
    code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, NULL /*TODO*/);
H
Hongze Cheng 已提交
358
    if (code) goto _err;
H
Hongze Cheng 已提交
359

H
Hongze Cheng 已提交
360
    if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) {
H
Hongze Cheng 已提交
361 362 363 364
      continue;
    }

  _write_block_data:
H
Hongze Cheng 已提交
365
    if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) {
H
Hongze Cheng 已提交
366 367 368 369 370
      block.last = 1;
    } else {
      block.last = 0;
    }

H
Hongze Cheng 已提交
371
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
372 373
    if (code) goto _err;

H
Hongze Cheng 已提交
374
    code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
H
Hongze Cheng 已提交
375 376
    if (code) goto _err;

H
Hongze Cheng 已提交
377
    tBlockReset(&block);
H
Hongze Cheng 已提交
378
    tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
379 380 381 382 383 384 385 386 387
  }

  return code;

_err:
  tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
388 389 390 391 392 393 394 395 396 397
static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
398

H
Hongze Cheng 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
    if (pRow == NULL) break;
    key = tsdbRowKey(pRow);

    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock,
                                   int8_t toDataOnly) {
  int32_t  code = 0;
  int32_t  iRow = 0;
  int32_t  nRow = 0;
  int32_t  c;
  TSDBROW *pRow;
H
Hongze Cheng 已提交
422
  SBlock   block = tBlockInit();
H
Hongze Cheng 已提交
423 424 425
  TSDBKEY  key1;
  TSDBKEY  key2;

H
Hongze Cheng 已提交
426
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
427 428

  // load last and merge until {pCommitter->maxKey, INT64_MAX}
H
Hongze Cheng 已提交
429
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL);
H
Hongze Cheng 已提交
430 431 432
  if (code) goto _err;

  iRow = 0;
H
Hongze Cheng 已提交
433
  nRow = pCommitter->oBlockData.nRow;
H
Hongze Cheng 已提交
434 435 436 437
  pRow = tsdbTbDataIterGet(pIter);

  while (true) {
    if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) {
H
Hongze Cheng 已提交
438
      if (pCommitter->nBlockData.nRow > 0) {
H
Hongze Cheng 已提交
439
        goto _write_block_data;
H
Hongze Cheng 已提交
440
      } else {
H
Hongze Cheng 已提交
441
        break;
H
Hongze Cheng 已提交
442
      }
H
Hongze Cheng 已提交
443
    }
H
Hongze Cheng 已提交
444 445 446 447

    // TODO

  _write_block_data:
H
Hongze Cheng 已提交
448 449
    block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0;
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block);
H
Hongze Cheng 已提交
450 451
    if (code) goto _err;

H
Hongze Cheng 已提交
452
    code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock);
H
Hongze Cheng 已提交
453
    if (code) goto _err;
H
Hongze Cheng 已提交
454
  }
H
Hongze Cheng 已提交
455

H
Hongze Cheng 已提交
456
  tBlockReset(&block);
H
Hongze Cheng 已提交
457
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
458 459 460 461 462

  return code;

_err:
  tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
463 464
  return code;
}
H
Hongze Cheng 已提交
465

H
Hongze Cheng 已提交
466 467 468
static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) {
  int32_t    code = 0;
  TSDBROW   *pRow;
H
Hongze Cheng 已提交
469
  SBlock     block = tBlockInit();
H
Hongze Cheng 已提交
470
  SBlockData nBlockData;
H
Hongze Cheng 已提交
471 472
  TSDBKEY    key;
  int32_t    c;
H
Hongze Cheng 已提交
473

H
Hongze Cheng 已提交
474 475 476 477 478
  if (pBlock == NULL) {
    key.ts = pCommitter->maxKey;
    key.version = INT64_MAX;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0);
    if (code) goto _err;
H
Hongze Cheng 已提交
479
  } else if (pBlock->last) {
H
Hongze Cheng 已提交
480
    // merge
H
Hongze Cheng 已提交
481
    code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0);
H
Hongze Cheng 已提交
482
    if (code) goto _err;
H
Hongze Cheng 已提交
483
  } else {
H
Hongze Cheng 已提交
484 485 486 487 488 489 490 491 492 493
    // memory
    key.ts = pBlock->info.minKey.ts;
    key.version = pBlock->info.minKey.version - 1;
    code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1);
    if (code) goto _err;

    // merge or move block
    pRow = tsdbTbDataIterGet(pIter);
    key.ts = pRow->pTSRow->ts;
    key.version = pRow->version;
H
Hongze Cheng 已提交
494

H
Hongze Cheng 已提交
495 496 497
    c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
    if (c > 0) {
      // move block
H
Hongze Cheng 已提交
498
      code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
H
Hongze Cheng 已提交
499 500
      if (code) goto _err;
    } else if (c == 0) {
H
Hongze Cheng 已提交
501 502 503 504 505 506 507 508
      int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock);

      if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) {
        code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1);
        if (code) goto _err;
      } else {
        // add as a subblock
      }
H
Hongze Cheng 已提交
509 510 511
    } else {
      ASSERT(0);
    }
H
Hongze Cheng 已提交
512 513 514 515 516 517
  }

  return code;

_err:
  tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
518
  return code;
H
Hongze Cheng 已提交
519 520
}

H
Hongze Cheng 已提交
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
  STbDataIter  iter;
  STbDataIter *pIter = &iter;
  TSDBROW     *pRow;
  SBlockIdx    blockIdx;  // TODO

  // create iter
  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter);
  } else {
    pIter = NULL;
  }

  // check
  pRow = tsdbTbDataIterGet(pIter);
  if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
538 539

  // start ================================
H
Hongze Cheng 已提交
540 541
  tMapDataReset(&pCommitter->oBlockMap);
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
542
  if (pBlockIdx) {
H
Hongze Cheng 已提交
543
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
H
Hongze Cheng 已提交
544 545
    if (code) goto _err;
  }
H
Hongze Cheng 已提交
546

H
Hongze Cheng 已提交
547
  // impl ===============================
H
Hongze Cheng 已提交
548 549 550
  SBlock  block;
  SBlock *pBlock = &block;
  int32_t iBlock = 0;
H
Hongze Cheng 已提交
551
  int32_t nBlockMap = pCommitter->oBlockMap.nItem;
H
Hongze Cheng 已提交
552

H
Hongze Cheng 已提交
553 554
  // merge
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
555 556
  while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlockMap) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
557
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock);
H
Hongze Cheng 已提交
558 559 560 561
    if (code) goto _err;

    pRow = tsdbTbDataIterGet(pIter);
    iBlock++;
H
Hongze Cheng 已提交
562
  }
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564 565 566
  // mem
  pRow = tsdbTbDataIterGet(pIter);
  while (!ROW_END(pRow, pCommitter->maxKey)) {
H
Hongze Cheng 已提交
567
    code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL);
H
Hongze Cheng 已提交
568
    if (code) goto _err;
H
Hongze Cheng 已提交
569

H
Hongze Cheng 已提交
570 571
    pRow = tsdbTbDataIterGet(pIter);
  }
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573
  // disk
H
Hongze Cheng 已提交
574 575
  while (iBlock < nBlockMap) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
H
Hongze Cheng 已提交
576

H
Hongze Cheng 已提交
577
    code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock);
H
Hongze Cheng 已提交
578 579 580
    if (code) goto _err;

    iBlock++;
H
Hongze Cheng 已提交
581 582
  }

H
Hongze Cheng 已提交
583
  // end ===============================
H
Hongze Cheng 已提交
584
  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx);
H
Hongze Cheng 已提交
585 586
  if (code) goto _err;

H
Hongze Cheng 已提交
587
  code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx);
H
Hongze Cheng 已提交
588
  if (code) goto _err;
H
Hongze Cheng 已提交
589

H
Hongze Cheng 已提交
590
_exit:
H
Hongze Cheng 已提交
591 592 593 594 595 596 597 598
  pRow = tsdbTbDataIterGet(pIter);
  if (pRow) {
    ASSERT(pRow->pTSRow->ts > pCommitter->maxKey);
    if (pCommitter->nextKey > pRow->pTSRow->ts) {
      pCommitter->nextKey = pRow->pTSRow->ts;
    }
  }

H
Hongze Cheng 已提交
599 600 601
  return code;

_err:
H
Hongze Cheng 已提交
602
  tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
603 604 605 606 607 608
  return code;
}

static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
609 610
  SDFileSet *pRSet = NULL;  // TODO
  SDFileSet *pWSet = NULL;  // TODO
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612
  // memory
H
Hongze Cheng 已提交
613
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
614

H
Hongze Cheng 已提交
615 616 617 618 619
  // old
  tMapDataReset(&pCommitter->oBlockIdxMap);
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
620
  if (pRSet) {
H
Hongze Cheng 已提交
621
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
622 623
    if (code) goto _err;

H
Hongze Cheng 已提交
624
    code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL);
H
Hongze Cheng 已提交
625
    if (code) goto _err;
H
Hongze Cheng 已提交
626 627
  }

H
Hongze Cheng 已提交
628 629 630 631 632
  // new
  tMapDataReset(&pCommitter->nBlockIdxMap);
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
633
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
H
Hongze Cheng 已提交
634 635 636 637 638 639
  if (code) goto _err;

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
640
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
641 642 643 644 645
  return code;
}

static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
646
  int32_t    c;
H
Hongze Cheng 已提交
647 648 649 650
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
651
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
652
  int32_t    nBlockIdx = pCommitter->oBlockIdxMap.nItem;
H
Hongze Cheng 已提交
653
  STbData   *pTbData;
H
Hongze Cheng 已提交
654
  SBlockIdx  blockIdx;
H
Hongze Cheng 已提交
655
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
656

H
Hongze Cheng 已提交
657
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
658

H
Hongze Cheng 已提交
659 660
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
661
    tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
662 663
  } else {
    pBlockIdx = NULL;
H
Hongze Cheng 已提交
664 665 666 667
  }

  while (true) {
    if (pTbData == NULL && pBlockIdx == NULL) break;
H
Hongze Cheng 已提交
668 669

    if (pTbData && pBlockIdx) {
H
Hongze Cheng 已提交
670 671
      c = tTABLEIDCmprFn(pTbData, pBlockIdx);

H
Hongze Cheng 已提交
672
      if (c == 0) {
H
Hongze Cheng 已提交
673
        goto _commit_mem_and_disk_data;
H
Hongze Cheng 已提交
674
      } else if (c < 0) {
H
Hongze Cheng 已提交
675
        goto _commit_mem_data;
H
Hongze Cheng 已提交
676
      } else {
H
Hongze Cheng 已提交
677
        goto _commit_disk_data;
H
Hongze Cheng 已提交
678
      }
H
Hongze Cheng 已提交
679 680
    } else if (pTbData) {
      goto _commit_mem_data;
H
Hongze Cheng 已提交
681
    } else {
H
Hongze Cheng 已提交
682
      goto _commit_disk_data;
H
Hongze Cheng 已提交
683 684
    }

H
Hongze Cheng 已提交
685 686 687
  _commit_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;
H
Hongze Cheng 已提交
688

H
Hongze Cheng 已提交
689 690 691 692
    iTbData++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
H
Hongze Cheng 已提交
693
      pTbData = NULL;
H
Hongze Cheng 已提交
694
    }
H
Hongze Cheng 已提交
695
    continue;
H
Hongze Cheng 已提交
696

H
Hongze Cheng 已提交
697 698 699
  _commit_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;
H
Hongze Cheng 已提交
700

H
Hongze Cheng 已提交
701 702
    iBlockIdx++;
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
703
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
704 705 706 707
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
708

H
Hongze Cheng 已提交
709 710
  _commit_mem_and_disk_data:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
H
Hongze Cheng 已提交
711
    if (code) goto _err;
H
Hongze Cheng 已提交
712

H
Hongze Cheng 已提交
713 714 715 716 717 718 719 720
    iTbData++;
    iBlockIdx++;
    if (iTbData < nTbData) {
      pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
    } else {
      pTbData = NULL;
    }
    if (iBlockIdx < nBlockIdx) {
H
Hongze Cheng 已提交
721
      tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx);
H
Hongze Cheng 已提交
722 723 724 725
    } else {
      pBlockIdx = NULL;
    }
    continue;
H
Hongze Cheng 已提交
726 727 728 729 730
  }

  return code;

_err:
H
Hongze Cheng 已提交
731
  tsdbError("vgId:%d commit file data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
732 733 734 735 736
  return code;
}

static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
737

H
Hongze Cheng 已提交
738
  // write blockIdx
H
Hongze Cheng 已提交
739
  code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL);
H
Hongze Cheng 已提交
740 741
  if (code) goto _err;

H
Hongze Cheng 已提交
742
  // update file header
H
Hongze Cheng 已提交
743 744 745
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
  if (code) goto _err;

H
Hongze Cheng 已提交
746 747
  // close and sync
  code = tsdbDataFWriterClose(pCommitter->pWriter, 1);
H
Hongze Cheng 已提交
748 749 750
  if (code) goto _err;

  if (pCommitter->pReader) {
H
Hongze Cheng 已提交
751
    code = tsdbDataFReaderClose(pCommitter->pReader);
H
Hongze Cheng 已提交
752 753 754 755 756 757 758
    goto _err;
  }

_exit:
  return code;

_err:
H
Hongze Cheng 已提交
759
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
760 761 762
  return code;
}

H
Hongze Cheng 已提交
763
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
764 765
  int32_t code = 0;

H
Hongze Cheng 已提交
766 767
  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
768
  if (code) goto _err;
H
Hongze Cheng 已提交
769

H
Hongze Cheng 已提交
770 771
  // commit file data impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
772
  if (code) goto _err;
H
Hongze Cheng 已提交
773

H
Hongze Cheng 已提交
774 775
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
776
  if (code) goto _err;
H
Hongze Cheng 已提交
777 778 779 780

  return code;

_err:
H
Hongze Cheng 已提交
781
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
782 783 784
  return code;
}

H
Hongze Cheng 已提交
785 786
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
787
  int32_t code = 0;
H
Hongze Cheng 已提交
788

H
Hongze Cheng 已提交
789 790 791 792 793 794
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
795

H
Hongze Cheng 已提交
796
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
797 798 799 800
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
801 802 803 804

  return code;
}

H
Hongze Cheng 已提交
805 806 807 808
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
809

H
Hongze Cheng 已提交
810
  // check
H
Hongze Cheng 已提交
811
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
812

H
Hongze Cheng 已提交
813
  // loop
H
Hongze Cheng 已提交
814
  pCommitter->nextKey = pMemTable->info.minKey.ts;
H
Hongze Cheng 已提交
815 816 817 818 819
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
820
    if (code) goto _err;
H
Hongze Cheng 已提交
821
  }
H
Hongze Cheng 已提交
822

H
Hongze Cheng 已提交
823 824 825
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
826

H
Hongze Cheng 已提交
827 828 829 830
_err:
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
831

H
Hongze Cheng 已提交
832 833 834 835
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
836

H
Hongze Cheng 已提交
837 838
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
839
  }
H
Hongze Cheng 已提交
840

H
Hongze Cheng 已提交
841 842 843 844 845
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
846

H
Hongze Cheng 已提交
847 848 849 850 851
  // impl
  code = tsdbCommitDelImpl(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
852

H
Hongze Cheng 已提交
853 854 855 856 857
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
858

H
Hongze Cheng 已提交
859
_exit:
H
Hongze Cheng 已提交
860
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
861 862 863
  return code;

_err:
H
Hongze Cheng 已提交
864
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
865
  return code;
H
Hongze Cheng 已提交
866 867 868 869 870 871 872
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
873 874 875 876 877 878

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
  int32_t code = 0;
  // TODO
  return code;
}