tsdbCommit.c 32.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  // --------------
H
Hongze Cheng 已提交
33
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
34 35 36
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
37
  // commit file data
H
Hongze Cheng 已提交
38
  SDataFReader *pReader;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
41
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
42
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
43 44
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
45
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
46 47
  SSkmInfo      skmTable;
  SSkmInfo      skmRow;
H
Hongze Cheng 已提交
48
  /* commit del */
H
Hongze Cheng 已提交
49 50
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52 53
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

65 66
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
67
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
68
  if (code) goto _err;
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72
  return code;

_err:
H
Hongze Cheng 已提交
73
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
74
  return code;
H
Hongze Cheng 已提交
75 76
}

H
more  
Hongze Cheng 已提交
77
int32_t tsdbCommit(STsdb *pTsdb) {
78
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80
  int32_t    code = 0;
H
Hongze Cheng 已提交
81 82 83 84
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
85 86
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
87 88 89 90
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
91

H
more  
Hongze Cheng 已提交
92
  // start commit
H
more  
Hongze Cheng 已提交
93
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
94
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
95

H
refact  
Hongze Cheng 已提交
96 97
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
98
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
99 100

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
101
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
102 103

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
104
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
105 106

  // end commit
H
more  
Hongze Cheng 已提交
107
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
108
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
_exit:
H
refact  
Hongze Cheng 已提交
111 112 113
  return code;

_err:
H
Hongze Cheng 已提交
114
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
115
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
116 117 118
  return code;
}

H
Hongze Cheng 已提交
119
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
120 121 122 123
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
  SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
H
Hongze Cheng 已提交
143
  if (pDelFileR) {
H
Hongze Cheng 已提交
144
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
145
    if (code) goto _err;
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
148
    if (code) goto _err;
H
Hongze Cheng 已提交
149 150
  }

H
Hongze Cheng 已提交
151
  // prepare new
H
Hongze Cheng 已提交
152 153
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
154
  if (code) goto _err;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
162 163 164
  return code;
}

H
Hongze Cheng 已提交
165
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
166
  int32_t   code = 0;
H
Hongze Cheng 已提交
167
  SDelData *pDelData;
H
Hongze Cheng 已提交
168 169
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
170

H
Hongze Cheng 已提交
171
  taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
172 173

  if (pTbData) {
H
Hongze Cheng 已提交
174 175
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
176

H
Hongze Cheng 已提交
177 178 179 180
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
181 182

  if (pDelIdx) {
H
Hongze Cheng 已提交
183 184 185 186
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
187 188 189
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
190
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
193 194

  // memory
H
Hongze Cheng 已提交
195 196
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
197 198 199 200
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
201 202 203
  }

  // write
H
Hongze Cheng 已提交
204
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
205 206 207
  if (code) goto _err;

  // put delIdx
H
Hongze Cheng 已提交
208 209 210 211
  if (taosArrayPush(pCommitter->aDelIdx, &delIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
221 222
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
223
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
226
  if (code) goto _err;
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229 230 231
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

  code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
232
  if (code) goto _err;
H
Hongze Cheng 已提交
233 234

  code = tsdbDelFWriterClose(pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
235
  if (code) goto _err;
H
Hongze Cheng 已提交
236 237 238 239 240 241

  if (pCommitter->pDelFReader) {
    code = tsdbDelFReaderClose(pCommitter->pDelFReader);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
242 243 244 245
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
246 247 248
  return code;

_err:
H
Hongze Cheng 已提交
249
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  // old
H
Hongze Cheng 已提交
263
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
264 265 266 267 268
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
269 270
    if (code) goto _err;

H
Hongze Cheng 已提交
271
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
272
    if (code) goto _err;
H
Hongze Cheng 已提交
273
  }
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275
  // new
H
Hongze Cheng 已提交
276
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
277
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
278
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286 287 288
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298 299 300 301
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306
_exit:
H
Hongze Cheng 已提交
307 308 309
  return code;

_err:
H
Hongze Cheng 已提交
310
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
311
  return code;
H
Hongze Cheng 已提交
312 313
}

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  pCommitter->skmTable.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->skmTable.pTSchema == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
340 341
  int32_t code = 0;

H
Hongze Cheng 已提交
342 343
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
344
      if (suid == 0) {
H
Hongze Cheng 已提交
345
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
346
      } else {
H
Hongze Cheng 已提交
347
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
348 349 350 351
      }
    }
  }

H
Hongze Cheng 已提交
352 353 354 355 356
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  pCommitter->skmRow.pTSchema = metaGetTbTSchema(pCommitter->pTsdb->pVnode->pMeta, uid, sver);
  if (pCommitter->skmRow.pTSchema == NULL) {
H
Hongze Cheng 已提交
357 358 359 360 361 362 363
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
364 365 366 367
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
368 369 370 371 372 373
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
388 389
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
390
  int32_t     code = 0;
H
Hongze Cheng 已提交
391 392 393
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
394 395
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
396
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
397 398
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
399 400

  // read SBlockData
H
Hongze Cheng 已提交
401
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
402 403
  if (code) goto _err;

H
Hongze Cheng 已提交
404 405 406
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
407 408
  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
409
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
410
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
411
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
412
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
413 414
  if (code) goto _err;

H
Hongze Cheng 已提交
415
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
416
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
417 418
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
419
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
420 421 422 423 424 425 426
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
427 428 429 430 431
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
432 433 434 435
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
436
      goto _append_mem_row;
H
Hongze Cheng 已提交
437
    } else {
H
Hongze Cheng 已提交
438 439 440 441
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
442
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
443
    if (code) goto _err;
H
Hongze Cheng 已提交
444

H
Hongze Cheng 已提交
445 446 447 448
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
449
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
450
        if (code) goto _err;
H
Hongze Cheng 已提交
451
      } else {
H
Hongze Cheng 已提交
452
        pRow1 = NULL;
H
Hongze Cheng 已提交
453
      }
H
Hongze Cheng 已提交
454 455
    }

H
Hongze Cheng 已提交
456 457 458 459 460 461 462
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
463
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
464 465
    if (code) goto _err;

H
Hongze Cheng 已提交
466 467
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
468
    } else {
H
Hongze Cheng 已提交
469
      pRow2 = NULL;
H
Hongze Cheng 已提交
470 471
    }

H
Hongze Cheng 已提交
472 473 474 475 476
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
477

H
Hongze Cheng 已提交
478
  _write_block:
H
Hongze Cheng 已提交
479
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
480 481 482
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
483
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
484 485 486 487 488 489 490 491 492
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
493
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
494 495
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
496 497
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
498
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
499 500
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
501

H
Hongze Cheng 已提交
502 503 504
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
505
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
506
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
507
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
508
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
509
  while (true) {
H
Hongze Cheng 已提交
510
    if (pRow == NULL) {
H
Hongze Cheng 已提交
511 512 513 514 515 516 517
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
518
    // update schema
H
Hongze Cheng 已提交
519
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
520 521
    if (code) goto _err;

H
Hongze Cheng 已提交
522
    // append
H
Hongze Cheng 已提交
523
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
524 525 526 527
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
528
    if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
H
Hongze Cheng 已提交
529 530

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
531
    continue;
H
Hongze Cheng 已提交
532 533

  _write_block:
H
Hongze Cheng 已提交
534
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
535 536 537
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
538
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
539 540 541 542 543
  }

  return code;

_err:
H
Hongze Cheng 已提交
544
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
545 546 547
  return code;
}

H
Hongze Cheng 已提交
548
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
549
  int32_t code = 0;
H
Hongze Cheng 已提交
550
  SBlock  block;
H
Hongze Cheng 已提交
551 552

  if (pBlock->last) {
H
Hongze Cheng 已提交
553
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
554 555
    if (code) goto _err;

H
Hongze Cheng 已提交
556 557
    tBlockReset(&block);
    code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
558 559 560 561 562 563 564 565 566 567
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
568 569 570
  return code;
}

H
Hongze Cheng 已提交
571 572
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
573 574
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
575 576 577 578

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
579 580 581 582
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
583 584 585 586 587 588 589 590

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
H
Hongze Cheng 已提交
608
      tsdbTbDataIterNext(pIter);
H
Hongze Cheng 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621 622
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
623
  SBlock      block;
H
Hongze Cheng 已提交
624 625
  TSDBROW    *pRow;

H
Hongze Cheng 已提交
626 627 628
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
629
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
630
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
631 632 633
  if (code) goto _err;
  while (true) {
    if (pRow) break;
H
Hongze Cheng 已提交
634
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
635 636 637 638 639 640 641 642
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
      int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);

      if (c == 0) {
H
Hongze Cheng 已提交
643 644
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
645 646 647 648 649 650 651 652 653
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
654 655
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
656 657 658 659 660 661 662 663 664
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
665 666
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
667 668
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
669
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
670
  int32_t      iBlock;
H
Hongze Cheng 已提交
671
  int32_t      nBlock;
H
Hongze Cheng 已提交
672 673
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
674 675 676 677

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
678 679
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
680 681
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
682 683 684 685 686 687 688 689 690 691 692
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
693

H
Hongze Cheng 已提交
694 695
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
696 697 698 699
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
700
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
701 702

  // start ===========
H
Hongze Cheng 已提交
703
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
704 705
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
706

H
Hongze Cheng 已提交
707
  iBlock = 0;
H
Hongze Cheng 已提交
708 709 710 711 712 713
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
714 715 716 717 718
  if (pRow) {
    code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
719 720
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
721
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
722

H
Hongze Cheng 已提交
723
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
724
      if (pBlock->last) {
H
Hongze Cheng 已提交
725
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
726
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
727 728 729
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
730
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
731
        iBlock++;
H
Hongze Cheng 已提交
732 733 734 735 736
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
737 738

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
739
      } else {
H
Hongze Cheng 已提交
740
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
741
        if (c > 0) {
H
Hongze Cheng 已提交
742
          // only disk data
H
Hongze Cheng 已提交
743
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
744 745
          if (code) goto _err;

H
Hongze Cheng 已提交
746 747 748 749 750 751
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
752
        } else if (c < 0) {
H
Hongze Cheng 已提交
753
          // only memory data
H
Hongze Cheng 已提交
754
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
755 756 757
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
758
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
759
        } else {
H
Hongze Cheng 已提交
760
          // merge memory and disk
H
Hongze Cheng 已提交
761 762
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
763
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
764 765
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
766
          } else {
H
Hongze Cheng 已提交
767 768 769 770 771 772 773 774 775 776
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
777
            }
H
Hongze Cheng 已提交
778 779 780

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
781
          }
H
Hongze Cheng 已提交
782 783 784 785 786 787 788 789 790

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
791 792 793
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
794
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
795 796
      if (code) goto _err;

H
Hongze Cheng 已提交
797 798 799 800 801 802
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
803
    } else {
H
Hongze Cheng 已提交
804 805
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
806 807 808
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
809 810
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
811 812 813
    }
  }

H
Hongze Cheng 已提交
814 815
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
816 817 818 819 820 821 822 823 824 825 826 827 828 829
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
    goto _err;
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
863 864 865
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
866 867 868 869 870 871

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
872 873
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
874
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
875
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
876
  STbData   *pTbData;
H
Hongze Cheng 已提交
877
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
878

H
Hongze Cheng 已提交
879
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
880

H
Hongze Cheng 已提交
881
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
882
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
883 884 885
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
886

H
Hongze Cheng 已提交
887
      if (c == 0) {
H
Hongze Cheng 已提交
888
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
889
      } else if (c < 0) {
H
Hongze Cheng 已提交
890
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
891
      } else {
H
Hongze Cheng 已提交
892
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
893
      }
H
Hongze Cheng 已提交
894
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
895 896 897 898
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
899

H
Hongze Cheng 已提交
900 901 902 903 904
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
905
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
906
    continue;
H
Hongze Cheng 已提交
907

H
Hongze Cheng 已提交
908 909 910 911 912
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
913
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
914 915 916 917 918 919 920
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
921
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
922
    iTbData++;
H
Hongze Cheng 已提交
923
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
924
    continue;
H
Hongze Cheng 已提交
925 926
  }

H
Hongze Cheng 已提交
927 928
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
929
  if (code) goto _err;
H
Hongze Cheng 已提交
930 931 932 933

  return code;

_err:
H
Hongze Cheng 已提交
934
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
935 936
  tsdbDataFReaderClose(&pCommitter->pReader);
  tsdbDataFWriterClose(&pCommitter->pWriter, 0);
H
Hongze Cheng 已提交
937 938 939
  return code;
}

H
Hongze Cheng 已提交
940 941
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
942
  int32_t code = 0;
H
Hongze Cheng 已提交
943

H
Hongze Cheng 已提交
944 945
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
946

H
Hongze Cheng 已提交
947 948 949 950
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
951

H
Hongze Cheng 已提交
952
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
953
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
954 955 956 957
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
958
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
959

H
Hongze Cheng 已提交
960 961 962 963 964 965 966
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
967 968 969
  return code;
}

H
Hongze Cheng 已提交
970 971 972
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
973
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
974 975 976 977 978
  if (pCommitter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
979
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
980 981 982 983 984
  if (pCommitter->aBlockIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
985 986
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
H
Hongze Cheng 已提交
987

H
Hongze Cheng 已提交
988
  code = tBlockDataInit(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
989
  if (code) goto _exit;
H
Hongze Cheng 已提交
990 991 992 993 994 995

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
996
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
997 998
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockDataClear(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
999
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
1000 1001
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
1002 1003
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
1004 1005
}

H
Hongze Cheng 已提交
1006 1007 1008 1009
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1010

H
Hongze Cheng 已提交
1011
  // check
H
Hongze Cheng 已提交
1012
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1013

H
Hongze Cheng 已提交
1014 1015
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1016
  if (code) goto _err;
H
Hongze Cheng 已提交
1017 1018 1019

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1020 1021 1022 1023 1024
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1025
    if (code) goto _err;
H
Hongze Cheng 已提交
1026
  }
H
Hongze Cheng 已提交
1027

H
Hongze Cheng 已提交
1028 1029 1030
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1031 1032 1033
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1034

H
Hongze Cheng 已提交
1035
_err:
H
Hongze Cheng 已提交
1036
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1037 1038 1039
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1040

H
Hongze Cheng 已提交
1041 1042 1043 1044
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1045

H
Hongze Cheng 已提交
1046 1047
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1048
  }
H
Hongze Cheng 已提交
1049

H
Hongze Cheng 已提交
1050 1051 1052 1053 1054
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1055

H
Hongze Cheng 已提交
1056
  // impl
H
Hongze Cheng 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1112
  }
H
Hongze Cheng 已提交
1113

H
Hongze Cheng 已提交
1114 1115 1116 1117 1118
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1119

H
Hongze Cheng 已提交
1120
_exit:
H
Hongze Cheng 已提交
1121
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1122 1123 1124
  return code;

_err:
H
Hongze Cheng 已提交
1125
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1126
  return code;
H
Hongze Cheng 已提交
1127 1128 1129 1130 1131 1132 1133
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1134 1135

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1154 1155
  return code;
}