tsdbCommit.c 31.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef struct {
H
Hongze Cheng 已提交
19
  STsdb *pTsdb;
H
Hongze Cheng 已提交
20
  /* commit data */
H
Hongze Cheng 已提交
21
  int64_t commitID;
H
Hongze Cheng 已提交
22 23
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
24 25
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
26
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
27
  // --------------
H
Hongze Cheng 已提交
28
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
29 30 31
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
32
  // commit file data
H
Hongze Cheng 已提交
33
  SDataFReader *pReader;
H
Hongze Cheng 已提交
34 35
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
36 37
  SBlock        oBlock;
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
38
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
41 42
  SBlock        nBlock;
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
43 44 45
  int64_t       suid;
  int64_t       uid;
  STSchema     *pTSchema;
H
Hongze Cheng 已提交
46
  /* commit del */
H
Hongze Cheng 已提交
47 48
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
49 50 51
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
52
} SCommitter;
H
refact  
Hongze Cheng 已提交
53

H
Hongze Cheng 已提交
54 55 56 57 58
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
59

H
refact  
Hongze Cheng 已提交
60
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
61
  int32_t code = 0;
H
Hongze Cheng 已提交
62

63 64
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
65
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
66
  if (code) goto _err;
H
Hongze Cheng 已提交
67

H
Hongze Cheng 已提交
68 69 70
  return code;

_err:
H
Hongze Cheng 已提交
71
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
72
  return code;
H
Hongze Cheng 已提交
73 74
}

H
more  
Hongze Cheng 已提交
75
int32_t tsdbCommit(STsdb *pTsdb) {
76
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
77

H
more  
Hongze Cheng 已提交
78
  int32_t    code = 0;
H
Hongze Cheng 已提交
79 80 81 82
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
83 84
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
85 86 87 88
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
89

H
more  
Hongze Cheng 已提交
90
  // start commit
H
more  
Hongze Cheng 已提交
91
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
92
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
93

H
refact  
Hongze Cheng 已提交
94 95
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
96
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
97 98

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
99
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
100 101

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
102
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
103 104

  // end commit
H
more  
Hongze Cheng 已提交
105
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
106
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
107

H
Hongze Cheng 已提交
108
_exit:
H
refact  
Hongze Cheng 已提交
109 110 111
  return code;

_err:
H
Hongze Cheng 已提交
112
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
113
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
114 115 116
  return code;
}

H
Hongze Cheng 已提交
117
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
118 119 120 121
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
139

H
Hongze Cheng 已提交
140
  SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
H
Hongze Cheng 已提交
141
  if (pDelFileR) {
H
Hongze Cheng 已提交
142
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
143
    if (code) goto _err;
H
Hongze Cheng 已提交
144

H
Hongze Cheng 已提交
145
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
146
    if (code) goto _err;
H
Hongze Cheng 已提交
147 148
  }

H
Hongze Cheng 已提交
149
  // prepare new
H
Hongze Cheng 已提交
150 151
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
152
  if (code) goto _err;
H
Hongze Cheng 已提交
153 154 155 156 157 158 159

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
160 161 162
  return code;
}

H
Hongze Cheng 已提交
163
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
164
  int32_t   code = 0;
H
Hongze Cheng 已提交
165
  SDelData *pDelData;
H
Hongze Cheng 已提交
166 167
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
168

H
Hongze Cheng 已提交
169
  taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
170 171

  if (pTbData) {
H
Hongze Cheng 已提交
172 173
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175 176 177 178
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
179 180

  if (pDelIdx) {
H
Hongze Cheng 已提交
181 182 183 184
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
185 186 187
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
188
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
189

H
Hongze Cheng 已提交
190
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
191 192

  // memory
H
Hongze Cheng 已提交
193 194
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
195 196 197 198
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
199 200 201
  }

  // write
H
Hongze Cheng 已提交
202
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
203 204 205
  if (code) goto _err;

  // put delIdx
H
Hongze Cheng 已提交
206 207 208 209
  if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
210 211 212 213 214 215 216 217 218

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
219 220
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
221
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
222

H
Hongze Cheng 已提交
223
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
224
  if (code) goto _err;
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226 227 228 229
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

  code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
230
  if (code) goto _err;
H
Hongze Cheng 已提交
231 232

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
233
  if (code) goto _err;
H
Hongze Cheng 已提交
234 235 236 237 238 239

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
240 241 242 243
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
244 245 246
  return code;

_err:
H
Hongze Cheng 已提交
247
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
248 249 250
  return code;
}

H
Hongze Cheng 已提交
251 252 253 254 255
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
256

H
Hongze Cheng 已提交
257 258
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260
  // old
H
Hongze Cheng 已提交
261
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
262 263 264 265 266 267
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockReset(&pCommitter->oBlock);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
268 269
    if (code) goto _err;

H
Hongze Cheng 已提交
270
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
271
    if (code) goto _err;
H
Hongze Cheng 已提交
272
  }
H
Hongze Cheng 已提交
273

H
Hongze Cheng 已提交
274
  // new
H
Hongze Cheng 已提交
275
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
276 277
  tMapDataReset(&pCommitter->nBlockMap);
  tBlockReset(&pCommitter->nBlock);
H
Hongze Cheng 已提交
278
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286 287 288
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298 299 300 301
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306
_exit:
H
Hongze Cheng 已提交
307 308 309
  return code;

_err:
H
Hongze Cheng 已提交
310
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
311
  return code;
H
Hongze Cheng 已提交
312 313
}

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
static int32_t tsdbCommitterUpdateSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->pTSchema) {
    if (pCommitter->suid == suid) {
      if (suid == 0) {
        if (pCommitter->uid == uid && sver == pCommitter->pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->pTSchema->version) goto _exit;
      }
    }
  }

_update_schema:
  pCommitter->suid = suid;
  pCommitter->uid = uid;
  tTSchemaDestroy(pCommitter->pTSchema);
  pCommitter->pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->pTSchema == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

  if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
    pBlock->last = 1;
  } else {
    pBlock->last = 0;
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
362 363
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
364
  int32_t     code = 0;
H
Hongze Cheng 已提交
365 366 367 368
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlock     *pBlock = &pCommitter->nBlock;
H
Hongze Cheng 已提交
369
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
370 371
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
372 373

  // read SBlockData
H
Hongze Cheng 已提交
374
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
375 376 377 378
  if (code) goto _err;

  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
379
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
380
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
381 382 383 384
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
  code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
  if (code) goto _err;

H
Hongze Cheng 已提交
385 386
  tBlockReset(pBlock);
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
387 388
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
389
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
390 391 392 393 394 395 396
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
397 398 399 400 401
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
402 403 404 405
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
406
      goto _append_mem_row;
H
Hongze Cheng 已提交
407
    } else {
H
Hongze Cheng 已提交
408 409 410 411 412 413
      goto _append_block_row;
    }

  _append_mem_row:
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema);
    if (code) goto _err;
H
Hongze Cheng 已提交
414

H
Hongze Cheng 已提交
415 416 417 418 419 420
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
        code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
        if (code) goto _err;
H
Hongze Cheng 已提交
421
      } else {
H
Hongze Cheng 已提交
422
        pRow1 = NULL;
H
Hongze Cheng 已提交
423
      }
H
Hongze Cheng 已提交
424 425
    }

H
Hongze Cheng 已提交
426 427 428 429 430 431 432
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
433
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
434 435
    if (code) goto _err;

H
Hongze Cheng 已提交
436 437
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
438
    } else {
H
Hongze Cheng 已提交
439
      pRow2 = NULL;
H
Hongze Cheng 已提交
440 441
    }

H
Hongze Cheng 已提交
442 443 444 445 446
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
447

H
Hongze Cheng 已提交
448
  _write_block:
H
Hongze Cheng 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }

    code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;

    tBlockReset(pBlock);
    tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
463 464 465 466 467 468 469 470 471
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
472
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
473 474
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
475
  SBlock     *pBlock = &pCommitter->nBlock;
H
Hongze Cheng 已提交
476
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
477 478
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
479

H
Hongze Cheng 已提交
480
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
481
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
482
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
483
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
484
  while (true) {
H
Hongze Cheng 已提交
485
    if (pRow == NULL) {
H
Hongze Cheng 已提交
486 487 488 489 490 491 492
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
493
    // update schema
H
Hongze Cheng 已提交
494 495 496
    code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
    if (code) goto _err;

H
Hongze Cheng 已提交
497
    // append
H
Hongze Cheng 已提交
498 499 500 501 502
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
503
    if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
H
Hongze Cheng 已提交
504 505

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
506
    continue;
H
Hongze Cheng 已提交
507 508

  _write_block:
H
Hongze Cheng 已提交
509
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
H
Hongze Cheng 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }

    code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, &(SBlockIdx){.suid = suid, .uid = uid},
                              pBlock, pCommitter->cmprAlg);
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
523 524 525 526 527 528
    tBlockDataReset(pBlockData);
  }

  return code;

_err:
H
Hongze Cheng 已提交
529
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
530 531 532
  return code;
}

H
Hongze Cheng 已提交
533
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
534
  int32_t code = 0;
H
Hongze Cheng 已提交
535 536

  if (pBlock->last) {
H
Hongze Cheng 已提交
537
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
538 539
    if (code) goto _err;

H
Hongze Cheng 已提交
540 541 542 543
    tBlockReset(&pCommitter->nBlock);
    pCommitter->nBlock.last = pBlock->last;
    code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
                              pCommitter->cmprAlg);
H
Hongze Cheng 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556
    if (code) goto _err;

    code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
557 558 559
  return code;
}

H
Hongze Cheng 已提交
560 561
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
562 563
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
564 565 566 567

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
568 569 570 571
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
572 573 574 575 576 577 578 579

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  TSDBROW    *pRow;

  tBlockDataReset(pBlockData);
  pRow = tsdbTbDataIterGet(pIter);
  code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
  if (code) goto _err;
  while (true) {
    if (pRow) break;
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
      int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);

      if (c == 0) {
        code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

  // write as a subblock
  code = tBlockCopy(pBlock, &pCommitter->nBlock);
  if (code) goto _err;

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
                            pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
656 657
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
658 659
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
660
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
661
  int32_t      iBlock;
H
Hongze Cheng 已提交
662
  int32_t      nBlock;
H
Hongze Cheng 已提交
663 664
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
665 666 667 668

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
669 670
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
671 672
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
673 674 675 676 677 678 679 680 681 682 683
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
684

H
Hongze Cheng 已提交
685 686
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
687 688 689 690
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
691
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
692 693

  // start ===========
H
Hongze Cheng 已提交
694
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
695
  SBlock *pBlock = &pCommitter->oBlock;
H
Hongze Cheng 已提交
696

H
Hongze Cheng 已提交
697
  iBlock = 0;
H
Hongze Cheng 已提交
698 699 700 701 702 703
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
704 705
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
706
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
707

H
Hongze Cheng 已提交
708
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
709
      if (pBlock->last) {
H
Hongze Cheng 已提交
710
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
711
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
712 713 714
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
715
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
716
        iBlock++;
H
Hongze Cheng 已提交
717 718 719 720 721
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
722 723

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
724
      } else {
H
Hongze Cheng 已提交
725
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
726
        if (c > 0) {
H
Hongze Cheng 已提交
727
          // only disk data
H
Hongze Cheng 已提交
728
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
729 730
          if (code) goto _err;

H
Hongze Cheng 已提交
731 732 733 734 735 736
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
737
        } else if (c < 0) {
H
Hongze Cheng 已提交
738
          // only memory data
H
Hongze Cheng 已提交
739
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
740 741 742
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
743
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
744
        } else {
H
Hongze Cheng 已提交
745
          // merge memory and disk
H
Hongze Cheng 已提交
746 747
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
748
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
749 750
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
751
          } else {
H
Hongze Cheng 已提交
752 753 754 755 756 757 758 759 760 761
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
762
            }
H
Hongze Cheng 已提交
763 764 765

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
766
          }
H
Hongze Cheng 已提交
767 768 769 770 771 772 773 774 775

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
776 777 778
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
779
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
780 781
      if (code) goto _err;

H
Hongze Cheng 已提交
782 783 784 785 786 787
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
788
    } else {
H
Hongze Cheng 已提交
789 790
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
791 792 793
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
794 795
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
796 797 798
    }
  }

H
Hongze Cheng 已提交
799 800
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
801 802 803 804 805 806 807 808 809 810 811 812 813 814
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
    goto _err;
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
848 849 850
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
851 852 853 854 855 856

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
857 858
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
859
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
860
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
861
  STbData   *pTbData;
H
Hongze Cheng 已提交
862
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
863

H
Hongze Cheng 已提交
864
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
865

H
Hongze Cheng 已提交
866
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
867
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
868 869 870
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
871

H
Hongze Cheng 已提交
872
      if (c == 0) {
H
Hongze Cheng 已提交
873
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
874
      } else if (c < 0) {
H
Hongze Cheng 已提交
875
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
876
      } else {
H
Hongze Cheng 已提交
877
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
878
      }
H
Hongze Cheng 已提交
879
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
880 881 882 883
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
884

H
Hongze Cheng 已提交
885 886 887 888 889
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
890
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
891
    continue;
H
Hongze Cheng 已提交
892

H
Hongze Cheng 已提交
893 894 895 896 897
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
898
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
899 900 901 902 903 904 905
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
906
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
907
    iTbData++;
H
Hongze Cheng 已提交
908
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
909
    continue;
H
Hongze Cheng 已提交
910 911
  }

H
Hongze Cheng 已提交
912 913
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
914
  if (code) goto _err;
H
Hongze Cheng 已提交
915 916 917 918

  return code;

_err:
H
Hongze Cheng 已提交
919
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
920 921 922
  return code;
}

H
Hongze Cheng 已提交
923 924
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
925
  int32_t code = 0;
H
Hongze Cheng 已提交
926

H
Hongze Cheng 已提交
927 928
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
929

H
Hongze Cheng 已提交
930 931 932 933
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
934

H
Hongze Cheng 已提交
935
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
936
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
937 938 939 940
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
941
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
942

H
Hongze Cheng 已提交
943 944 945 946 947 948 949
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
950 951 952
  return code;
}

H
Hongze Cheng 已提交
953 954 955 956
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pReader = NULL;
H
Hongze Cheng 已提交
957
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
958 959 960
  pCommitter->oBlockMap = tMapDataInit();
  pCommitter->oBlock = tBlockInit();
  pCommitter->pWriter = NULL;
H
Hongze Cheng 已提交
961
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976
  pCommitter->nBlockMap = tMapDataInit();
  pCommitter->nBlock = tBlockInit();
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
  code = tBlockDataInit(&pCommitter->nBlockData);
  if (code) {
    tBlockDataClear(&pCommitter->oBlockData);
    goto _exit;
  }

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
977
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
978 979 980
  // tMapDataClear(&pCommitter->oBlockMap);
  // tBlockClear(&pCommitter->oBlock);
  // tBlockDataClear(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
981
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
982 983 984
  // tMapDataClear(&pCommitter->nBlockMap);
  // tBlockClear(&pCommitter->nBlock);
  // tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
985 986
}

H
Hongze Cheng 已提交
987 988 989 990
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
991

H
Hongze Cheng 已提交
992
  // check
H
Hongze Cheng 已提交
993
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
994

H
Hongze Cheng 已提交
995 996
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
997
  if (code) goto _err;
H
Hongze Cheng 已提交
998 999 1000

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1001 1002 1003 1004 1005
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1006
    if (code) goto _err;
H
Hongze Cheng 已提交
1007
  }
H
Hongze Cheng 已提交
1008

H
Hongze Cheng 已提交
1009 1010 1011
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1012 1013 1014
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1015

H
Hongze Cheng 已提交
1016
_err:
H
Hongze Cheng 已提交
1017
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1018 1019 1020
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1021

H
Hongze Cheng 已提交
1022 1023 1024 1025
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1026

H
Hongze Cheng 已提交
1027 1028
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1029
  }
H
Hongze Cheng 已提交
1030

H
Hongze Cheng 已提交
1031 1032 1033 1034 1035
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1036

H
Hongze Cheng 已提交
1037
  // impl
H
Hongze Cheng 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1093
  }
H
Hongze Cheng 已提交
1094

H
Hongze Cheng 已提交
1095 1096 1097 1098 1099
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1100

H
Hongze Cheng 已提交
1101
_exit:
H
Hongze Cheng 已提交
1102
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1103 1104 1105
  return code;

_err:
H
Hongze Cheng 已提交
1106
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1107
  return code;
H
Hongze Cheng 已提交
1108 1109 1110 1111 1112 1113 1114
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1115 1116

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1135 1136
  return code;
}