tsdbCommit.c 31.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  // --------------
H
Hongze Cheng 已提交
33
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
34 35 36
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
37
  // commit file data
H
Hongze Cheng 已提交
38
  SDataFReader *pReader;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
41
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
42
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
43 44
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
45
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
46 47
  SSkmInfo      skmTable;
  SSkmInfo      skmRow;
H
Hongze Cheng 已提交
48
  /* commit del */
H
Hongze Cheng 已提交
49 50
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52 53
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

65 66
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
67
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
68
  if (code) goto _err;
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72
  return code;

_err:
H
Hongze Cheng 已提交
73
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
74
  return code;
H
Hongze Cheng 已提交
75 76
}

H
more  
Hongze Cheng 已提交
77
int32_t tsdbCommit(STsdb *pTsdb) {
78
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80
  int32_t    code = 0;
H
Hongze Cheng 已提交
81 82 83 84
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
85 86
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
87 88 89 90
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
91

H
more  
Hongze Cheng 已提交
92
  // start commit
H
more  
Hongze Cheng 已提交
93
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
94
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
95

H
refact  
Hongze Cheng 已提交
96 97
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
98
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
99 100

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
101
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
102 103

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
104
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
105 106

  // end commit
H
more  
Hongze Cheng 已提交
107
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
108
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
_exit:
H
refact  
Hongze Cheng 已提交
111 112 113
  return code;

_err:
H
Hongze Cheng 已提交
114
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
115
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
116 117 118
  return code;
}

H
Hongze Cheng 已提交
119
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
120 121 122 123
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
  SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
H
Hongze Cheng 已提交
143
  if (pDelFileR) {
H
Hongze Cheng 已提交
144
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
145
    if (code) goto _err;
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
148
    if (code) goto _err;
H
Hongze Cheng 已提交
149 150
  }

H
Hongze Cheng 已提交
151
  // prepare new
H
Hongze Cheng 已提交
152 153
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
154
  if (code) goto _err;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
162 163 164
  return code;
}

H
Hongze Cheng 已提交
165
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
166
  int32_t   code = 0;
H
Hongze Cheng 已提交
167
  SDelData *pDelData;
H
Hongze Cheng 已提交
168 169
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
172 173

  if (pTbData) {
H
Hongze Cheng 已提交
174 175
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178 179 180
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
181 182

  if (pDelIdx) {
H
Hongze Cheng 已提交
183 184 185 186
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
187 188 189
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
190
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
193 194

  // memory
H
Hongze Cheng 已提交
195 196
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
197 198 199 200
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
201 202 203
  }

  // write
H
Hongze Cheng 已提交
204
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
205 206 207
  if (code) goto _err;

  // put delIdx
H
Hongze Cheng 已提交
208 209 210 211
  if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
221 222
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
223
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
226
  if (code) goto _err;
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229 230 231
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

  code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
232
  if (code) goto _err;
H
Hongze Cheng 已提交
233 234

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
235
  if (code) goto _err;
H
Hongze Cheng 已提交
236 237 238 239 240 241

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
242 243 244 245
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
246 247 248
  return code;

_err:
H
Hongze Cheng 已提交
249
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  // old
H
Hongze Cheng 已提交
263
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
264 265 266 267 268
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
269 270
    if (code) goto _err;

H
Hongze Cheng 已提交
271
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
272
    if (code) goto _err;
H
Hongze Cheng 已提交
273
  }
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275
  // new
H
Hongze Cheng 已提交
276
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
277
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
278
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286 287 288
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298 299 300 301
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306
_exit:
H
Hongze Cheng 已提交
307 308 309
  return code;

_err:
H
Hongze Cheng 已提交
310
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
311
  return code;
H
Hongze Cheng 已提交
312 313
}

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  pCommitter->skmTable.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->skmTable.pTSchema == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
340 341
  int32_t code = 0;

H
Hongze Cheng 已提交
342 343
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
344
      if (suid == 0) {
H
Hongze Cheng 已提交
345
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
346
      } else {
H
Hongze Cheng 已提交
347
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
348 349 350 351
      }
    }
  }

H
Hongze Cheng 已提交
352 353 354 355 356
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  pCommitter->skmRow.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->skmRow.pTSchema == NULL) {
H
Hongze Cheng 已提交
357 358 359 360 361 362 363
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
364 365 366 367
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
368 369 370 371 372 373
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
388 389
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
390
  int32_t     code = 0;
H
Hongze Cheng 已提交
391 392 393
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
394 395
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
396
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
397 398
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
399 400

  // read SBlockData
H
Hongze Cheng 已提交
401
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
402 403 404 405
  if (code) goto _err;

  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
406
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
407
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
408
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
409
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
410 411
  if (code) goto _err;

H
Hongze Cheng 已提交
412 413
  tBlockReset(pBlock);
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
414 415
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
416
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
417 418 419 420 421 422 423
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
424 425 426 427 428
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
429 430 431 432
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
433
      goto _append_mem_row;
H
Hongze Cheng 已提交
434
    } else {
H
Hongze Cheng 已提交
435 436 437 438
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
439
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
440
    if (code) goto _err;
H
Hongze Cheng 已提交
441

H
Hongze Cheng 已提交
442 443 444 445
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
446
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
447
        if (code) goto _err;
H
Hongze Cheng 已提交
448
      } else {
H
Hongze Cheng 已提交
449
        pRow1 = NULL;
H
Hongze Cheng 已提交
450
      }
H
Hongze Cheng 已提交
451 452
    }

H
Hongze Cheng 已提交
453 454 455 456 457 458 459
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
460
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
461 462
    if (code) goto _err;

H
Hongze Cheng 已提交
463 464
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
465
    } else {
H
Hongze Cheng 已提交
466
      pRow2 = NULL;
H
Hongze Cheng 已提交
467 468
    }

H
Hongze Cheng 已提交
469 470 471 472 473
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
474

H
Hongze Cheng 已提交
475
  _write_block:
H
Hongze Cheng 已提交
476
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
477 478 479 480
    if (code) goto _err;

    tBlockReset(pBlock);
    tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
481 482 483 484 485 486 487 488 489
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
490
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
491 492
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
493 494
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
495
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
496 497
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
498

H
Hongze Cheng 已提交
499
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
500
  tBlockDataReset(pBlockData);
H
Hongze Cheng 已提交
501
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
502
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
503
  while (true) {
H
Hongze Cheng 已提交
504
    if (pRow == NULL) {
H
Hongze Cheng 已提交
505 506 507 508 509 510 511
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
512
    // update schema
H
Hongze Cheng 已提交
513
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
514 515
    if (code) goto _err;

H
Hongze Cheng 已提交
516
    // append
H
Hongze Cheng 已提交
517
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
518 519 520 521
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
522
    if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
H
Hongze Cheng 已提交
523 524

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
525
    continue;
H
Hongze Cheng 已提交
526 527

  _write_block:
H
Hongze Cheng 已提交
528
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
529 530 531
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
532 533 534 535 536 537
    tBlockDataReset(pBlockData);
  }

  return code;

_err:
H
Hongze Cheng 已提交
538
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
539 540 541
  return code;
}

H
Hongze Cheng 已提交
542
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
543
  int32_t code = 0;
H
Hongze Cheng 已提交
544
  SBlock  block;
H
Hongze Cheng 已提交
545 546

  if (pBlock->last) {
H
Hongze Cheng 已提交
547
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
548 549
    if (code) goto _err;

H
Hongze Cheng 已提交
550 551
    tBlockReset(&block);
    code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
552 553 554 555 556 557 558 559 560 561
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
562 563 564
  return code;
}

H
Hongze Cheng 已提交
565 566
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
567 568
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
569 570 571 572

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
573 574 575 576
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
577 578 579 580 581 582 583 584

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
616
  SBlock      block;
H
Hongze Cheng 已提交
617 618 619 620
  TSDBROW    *pRow;

  tBlockDataReset(pBlockData);
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
621
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
622 623 624
  if (code) goto _err;
  while (true) {
    if (pRow) break;
H
Hongze Cheng 已提交
625
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
626 627 628 629 630 631 632 633
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
      int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);

      if (c == 0) {
H
Hongze Cheng 已提交
634 635
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
636 637 638 639 640 641 642 643 644
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
645 646
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
647 648 649 650 651 652 653 654 655
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
656 657
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
658 659
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
660
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
661
  int32_t      iBlock;
H
Hongze Cheng 已提交
662
  int32_t      nBlock;
H
Hongze Cheng 已提交
663 664
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
665 666 667 668

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
669 670
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
671 672
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
673 674 675 676 677 678 679 680 681 682 683
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
684

H
Hongze Cheng 已提交
685 686
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
687 688 689 690
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
691
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
692 693

  // start ===========
H
Hongze Cheng 已提交
694
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
695 696
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
697

H
Hongze Cheng 已提交
698
  iBlock = 0;
H
Hongze Cheng 已提交
699 700 701 702 703 704
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
705 706
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
707
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
708

H
Hongze Cheng 已提交
709
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
710
      if (pBlock->last) {
H
Hongze Cheng 已提交
711
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
712
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
713 714 715
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
716
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
717
        iBlock++;
H
Hongze Cheng 已提交
718 719 720 721 722
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
723 724

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
725
      } else {
H
Hongze Cheng 已提交
726
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
727
        if (c > 0) {
H
Hongze Cheng 已提交
728
          // only disk data
H
Hongze Cheng 已提交
729
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
730 731
          if (code) goto _err;

H
Hongze Cheng 已提交
732 733 734 735 736 737
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
738
        } else if (c < 0) {
H
Hongze Cheng 已提交
739
          // only memory data
H
Hongze Cheng 已提交
740
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
741 742 743
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
744
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
745
        } else {
H
Hongze Cheng 已提交
746
          // merge memory and disk
H
Hongze Cheng 已提交
747 748
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
749
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
750 751
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
752
          } else {
H
Hongze Cheng 已提交
753 754 755 756 757 758 759 760 761 762
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
763
            }
H
Hongze Cheng 已提交
764 765 766

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
767
          }
H
Hongze Cheng 已提交
768 769 770 771 772 773 774 775 776

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
777 778 779
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
780
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
781 782
      if (code) goto _err;

H
Hongze Cheng 已提交
783 784 785 786 787 788
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
789
    } else {
H
Hongze Cheng 已提交
790 791
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
792 793 794
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
795 796
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
797 798 799
    }
  }

H
Hongze Cheng 已提交
800 801
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
    goto _err;
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
849 850 851
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
852 853 854 855 856 857

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
858 859
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
860
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
861
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
862
  STbData   *pTbData;
H
Hongze Cheng 已提交
863
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
864

H
Hongze Cheng 已提交
865
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
866

H
Hongze Cheng 已提交
867
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
868
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
869 870 871
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
872

H
Hongze Cheng 已提交
873
      if (c == 0) {
H
Hongze Cheng 已提交
874
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
875
      } else if (c < 0) {
H
Hongze Cheng 已提交
876
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
877
      } else {
H
Hongze Cheng 已提交
878
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
879
      }
H
Hongze Cheng 已提交
880
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
881 882 883 884
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
885

H
Hongze Cheng 已提交
886 887 888 889 890
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
891
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
892
    continue;
H
Hongze Cheng 已提交
893

H
Hongze Cheng 已提交
894 895 896 897 898
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
899
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
900 901 902 903 904 905 906
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
907
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
908
    iTbData++;
H
Hongze Cheng 已提交
909
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
910
    continue;
H
Hongze Cheng 已提交
911 912
  }

H
Hongze Cheng 已提交
913 914
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
915
  if (code) goto _err;
H
Hongze Cheng 已提交
916 917 918 919

  return code;

_err:
H
Hongze Cheng 已提交
920
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
921 922
  tsdbDataFReaderClose(&pCommitter->pReader);
  tsdbDataFWriterClose(&pCommitter->pWriter, 0);
H
Hongze Cheng 已提交
923 924 925
  return code;
}

H
Hongze Cheng 已提交
926 927
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
928
  int32_t code = 0;
H
Hongze Cheng 已提交
929

H
Hongze Cheng 已提交
930 931
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
932

H
Hongze Cheng 已提交
933 934 935 936
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
937

H
Hongze Cheng 已提交
938
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
939
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
940 941 942 943
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
944
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946 947 948 949 950 951 952
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
953 954 955
  return code;
}

H
Hongze Cheng 已提交
956 957 958
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
959
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
960 961 962 963 964
  if (pCommitter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
965
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
966 967 968 969 970
  if (pCommitter->aBlockIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
971 972
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
H
Hongze Cheng 已提交
973

H
Hongze Cheng 已提交
974
  code = tBlockDataInit(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
975
  if (code) goto _exit;
H
Hongze Cheng 已提交
976 977 978 979 980 981

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
982
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
983 984
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockDataClear(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
985
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
986 987
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
988 989
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
990 991
}

H
Hongze Cheng 已提交
992 993 994 995
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
996

H
Hongze Cheng 已提交
997
  // check
H
Hongze Cheng 已提交
998
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
999

H
Hongze Cheng 已提交
1000 1001
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1002
  if (code) goto _err;
H
Hongze Cheng 已提交
1003 1004 1005

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1006 1007 1008 1009 1010
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1011
    if (code) goto _err;
H
Hongze Cheng 已提交
1012
  }
H
Hongze Cheng 已提交
1013

H
Hongze Cheng 已提交
1014 1015 1016
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1017 1018 1019
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1020

H
Hongze Cheng 已提交
1021
_err:
H
Hongze Cheng 已提交
1022
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1023 1024 1025
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1026

H
Hongze Cheng 已提交
1027 1028 1029 1030
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1031

H
Hongze Cheng 已提交
1032 1033
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1034
  }
H
Hongze Cheng 已提交
1035

H
Hongze Cheng 已提交
1036 1037 1038 1039 1040
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1041

H
Hongze Cheng 已提交
1042
  // impl
H
Hongze Cheng 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1098
  }
H
Hongze Cheng 已提交
1099

H
Hongze Cheng 已提交
1100 1101 1102 1103 1104
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1105

H
Hongze Cheng 已提交
1106
_exit:
H
Hongze Cheng 已提交
1107
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1108 1109 1110
  return code;

_err:
H
Hongze Cheng 已提交
1111
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1112
  return code;
H
Hongze Cheng 已提交
1113 1114 1115 1116 1117 1118 1119
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1120 1121

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1140 1141
  return code;
}