tsdbCommit.c 32.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23
typedef struct {
H
Hongze Cheng 已提交
24
  STsdb *pTsdb;
H
Hongze Cheng 已提交
25
  /* commit data */
H
Hongze Cheng 已提交
26
  int64_t commitID;
H
Hongze Cheng 已提交
27 28
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
29 30
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
31
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
32
  // --------------
H
Hongze Cheng 已提交
33
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
34 35 36
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
37
  // commit file data
H
Hongze Cheng 已提交
38
  SDataFReader *pReader;
H
Hongze Cheng 已提交
39 40
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      oBlockMap;  // SMapData<SBlock>, read from reader
H
Hongze Cheng 已提交
41
  SBlockData    oBlockData;
H
Hongze Cheng 已提交
42
  SDataFWriter *pWriter;
H
Hongze Cheng 已提交
43 44
  SArray       *aBlockIdxN;  // SArray<SBlockIdx>
  SMapData      nBlockMap;   // SMapData<SBlock>
H
Hongze Cheng 已提交
45
  SBlockData    nBlockData;
H
Hongze Cheng 已提交
46 47
  SSkmInfo      skmTable;
  SSkmInfo      skmRow;
H
Hongze Cheng 已提交
48
  /* commit del */
H
Hongze Cheng 已提交
49 50
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
51 52 53
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
54
} SCommitter;
H
refact  
Hongze Cheng 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
refact  
Hongze Cheng 已提交
61

H
refact  
Hongze Cheng 已提交
62
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
63
  int32_t code = 0;
H
Hongze Cheng 已提交
64

65 66
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
67
  code = tsdbMemTableCreate(pTsdb, &pTsdb->mem);
H
Hongze Cheng 已提交
68
  if (code) goto _err;
H
Hongze Cheng 已提交
69

H
Hongze Cheng 已提交
70 71 72
  return code;

_err:
H
Hongze Cheng 已提交
73
  tsdbError("vgId:%d tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
74
  return code;
H
Hongze Cheng 已提交
75 76
}

H
more  
Hongze Cheng 已提交
77
int32_t tsdbCommit(STsdb *pTsdb) {
78
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
79

H
more  
Hongze Cheng 已提交
80
  int32_t    code = 0;
H
Hongze Cheng 已提交
81 82 83 84
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
85 86
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
    // TODO: lock?
H
Hongze Cheng 已提交
87 88 89 90
    pTsdb->mem = NULL;
    tsdbMemTableDestroy(pMemTable);
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
91

H
more  
Hongze Cheng 已提交
92
  // start commit
H
more  
Hongze Cheng 已提交
93
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
94
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
95

H
refact  
Hongze Cheng 已提交
96 97
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
98
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
99 100

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
101
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
102 103

  code = tsdbCommitCache(&commith);
H
Hongze Cheng 已提交
104
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
105 106

  // end commit
H
more  
Hongze Cheng 已提交
107
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
108
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
109

H
Hongze Cheng 已提交
110
_exit:
H
refact  
Hongze Cheng 已提交
111 112 113
  return code;

_err:
H
Hongze Cheng 已提交
114
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
115
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
116 117 118
  return code;
}

H
Hongze Cheng 已提交
119
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
120 121 122 123
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
141

H
Hongze Cheng 已提交
142
  SDelFile *pDelFileR = pTsdb->fs->nState->pDelFile;
H
Hongze Cheng 已提交
143
  if (pDelFileR) {
H
Hongze Cheng 已提交
144
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb, NULL);
H
Hongze Cheng 已提交
145
    if (code) goto _err;
H
Hongze Cheng 已提交
146

H
Hongze Cheng 已提交
147
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx, NULL);
H
Hongze Cheng 已提交
148
    if (code) goto _err;
H
Hongze Cheng 已提交
149 150
  }

H
Hongze Cheng 已提交
151
  // prepare new
H
Hongze Cheng 已提交
152 153
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
154
  if (code) goto _err;
H
Hongze Cheng 已提交
155 156 157 158 159 160 161

_exit:
  tsdbDebug("vgId:%d commit del start", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
162 163 164
  return code;
}

H
Hongze Cheng 已提交
165
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
166
  int32_t   code = 0;
H
Hongze Cheng 已提交
167
  SDelData *pDelData;
H
Hongze Cheng 已提交
168 169
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
170 171

  if (pTbData) {
H
Hongze Cheng 已提交
172 173
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175 176 177 178
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
179 180

  if (pDelIdx) {
H
Hongze Cheng 已提交
181 182 183 184
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, NULL);
H
Hongze Cheng 已提交
185
    if (code) goto _err;
186 187
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
188 189
  }

H
Hongze Cheng 已提交
190
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
191

H
Hongze Cheng 已提交
192
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
193 194

  // memory
H
Hongze Cheng 已提交
195 196
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
197 198 199 200
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
201 202 203
  }

  // write
H
Hongze Cheng 已提交
204
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, NULL, &delIdx);
H
Hongze Cheng 已提交
205 206 207
  if (code) goto _err;

  // put delIdx
208
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
209 210 211
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
212 213 214 215 216 217 218 219 220

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
221 222
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
223
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN, NULL);
H
Hongze Cheng 已提交
226
  if (code) goto _err;
H
Hongze Cheng 已提交
227

H
Hongze Cheng 已提交
228 229 230 231
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

  code = tsdbFSStateUpsertDelFile(pTsdb->fs->nState, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
232
  if (code) goto _err;
H
Hongze Cheng 已提交
233

H
Hongze Cheng 已提交
234
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
235
  if (code) goto _err;
H
Hongze Cheng 已提交
236 237

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
238
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
239 240 241
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
242 243 244 245
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
246 247 248
  return code;

_err:
H
Hongze Cheng 已提交
249
  tsdbError("vgId:%d commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
250 251 252
  return code;
}

H
Hongze Cheng 已提交
253 254 255 256 257
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
  SDFileSet  wSet;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260
  // memory
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  // old
H
Hongze Cheng 已提交
263
  taosArrayClear(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
264 265 266 267 268
  tMapDataReset(&pCommitter->oBlockMap);
  tBlockDataReset(&pCommitter->oBlockData);
  pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
  if (pRSet) {
    code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
269 270
    if (code) goto _err;

H
Hongze Cheng 已提交
271
    code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
H
Hongze Cheng 已提交
272
    if (code) goto _err;
H
Hongze Cheng 已提交
273
  }
H
Hongze Cheng 已提交
274

H
Hongze Cheng 已提交
275
  // new
H
Hongze Cheng 已提交
276
  taosArrayClear(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
277
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
278
  tBlockDataReset(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286 287 288
  if (pRSet) {
    wSet = (SDFileSet){.diskId = pRSet->diskId,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = pRSet->fData,
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = pRSet->fSma};
  } else {
    STfs   *pTfs = pTsdb->pVnode->pTfs;
    SDiskID did = {.level = 0, .id = 0};
H
Hongze Cheng 已提交
289

H
Hongze Cheng 已提交
290 291
    // TODO: alloc a new disk
    // tfsAllocDisk(pTfs, 0, &did);
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293 294
    // create the directory
    tfsMkdirRecurAt(pTfs, pTsdb->path, did);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296 297 298 299 300 301
    wSet = (SDFileSet){.diskId = did,
                       .fid = pCommitter->commitFid,
                       .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
                       .fData = {.commitID = pCommitter->commitID, .size = 0},
                       .fLast = {.commitID = pCommitter->commitID, .size = 0},
                       .fSma = {.commitID = pCommitter->commitID, .size = 0}};
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304
  code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
  if (code) goto _err;
H
Hongze Cheng 已提交
305

H
Hongze Cheng 已提交
306
_exit:
H
Hongze Cheng 已提交
307 308 309
  return code;

_err:
H
Hongze Cheng 已提交
310
  tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
311
  return code;
H
Hongze Cheng 已提交
312 313
}

H
Hongze Cheng 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmTable.pTSchema) {
    if (pCommitter->skmTable.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmTable.uid == uid && sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmTable.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
330 331
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmTable.pTSchema);
  if (code) goto _exit;
H
Hongze Cheng 已提交
332 333 334 335 336 337

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
H
Hongze Cheng 已提交
338 339
  int32_t code = 0;

H
Hongze Cheng 已提交
340 341
  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
H
Hongze Cheng 已提交
342
      if (suid == 0) {
H
Hongze Cheng 已提交
343
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
344
      } else {
H
Hongze Cheng 已提交
345
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
H
Hongze Cheng 已提交
346 347 348 349
      }
    }
  }

H
Hongze Cheng 已提交
350 351 352
  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
353 354 355
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
H
Hongze Cheng 已提交
356 357 358 359 360 361
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
362 363 364 365
static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockData, SBlock *pBlock, SBlockIdx *pBlockIdx,
                                   int8_t toDataOnly) {
  int32_t code = 0;

H
Hongze Cheng 已提交
366 367 368 369 370 371
  if (pBlock->nSubBlock == 0) {
    if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
      pBlock->last = 1;
    } else {
      pBlock->last = 0;
    }
H
Hongze Cheng 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385
  }

  code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
  if (code) goto _err;

  code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
386 387
static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey,
                                  int8_t toDataOnly) {
H
Hongze Cheng 已提交
388
  int32_t     code = 0;
H
Hongze Cheng 已提交
389 390 391
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
  SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
392 393
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
394
  TSDBROW    *pRow1;
H
Hongze Cheng 已提交
395 396
  TSDBROW     row2;
  TSDBROW    *pRow2 = &row2;
H
Hongze Cheng 已提交
397 398

  // read SBlockData
H
Hongze Cheng 已提交
399
  code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
H
Hongze Cheng 已提交
400 401
  if (code) goto _err;

H
Hongze Cheng 已提交
402 403 404
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
405 406
  // loop to merge
  pRow1 = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
407
  *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0);
H
Hongze Cheng 已提交
408
  ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0);
H
Hongze Cheng 已提交
409
  ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0);
H
Hongze Cheng 已提交
410
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
411 412
  if (code) goto _err;

H
Hongze Cheng 已提交
413
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
414
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
415 416
  while (true) {
    if (pRow1 == NULL && pRow2 == NULL) {
H
Hongze Cheng 已提交
417
      if (pBlockData->nRow == 0) {
H
Hongze Cheng 已提交
418 419 420 421 422 423 424
        break;
      } else {
        goto _write_block;
      }
    }

    if (pRow1 && pRow2) {
H
Hongze Cheng 已提交
425 426 427 428 429
      int32_t c = tsdbRowCmprFn(pRow1, pRow2);
      if (c < 0) {
        goto _append_mem_row;
      } else if (c > 0) {
        goto _append_block_row;
H
Hongze Cheng 已提交
430 431 432 433
      } else {
        ASSERT(0);
      }
    } else if (pRow1) {
H
Hongze Cheng 已提交
434
      goto _append_mem_row;
H
Hongze Cheng 已提交
435
    } else {
H
Hongze Cheng 已提交
436 437 438 439
      goto _append_block_row;
    }

  _append_mem_row:
H
Hongze Cheng 已提交
440
    code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
441
    if (code) goto _err;
H
Hongze Cheng 已提交
442

H
Hongze Cheng 已提交
443 444 445 446
    tsdbTbDataIterNext(pIter);
    pRow1 = tsdbTbDataIterGet(pIter);
    if (pRow1) {
      if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
H
Hongze Cheng 已提交
447
        code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
H
Hongze Cheng 已提交
448
        if (code) goto _err;
H
Hongze Cheng 已提交
449
      } else {
H
Hongze Cheng 已提交
450
        pRow1 = NULL;
H
Hongze Cheng 已提交
451
      }
H
Hongze Cheng 已提交
452 453
    }

H
Hongze Cheng 已提交
454 455 456 457 458 459 460
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }

  _append_block_row:
H
Hongze Cheng 已提交
461
    code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
H
Hongze Cheng 已提交
462 463
    if (code) goto _err;

H
Hongze Cheng 已提交
464 465
    if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
      *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
H
Hongze Cheng 已提交
466
    } else {
H
Hongze Cheng 已提交
467
      pRow2 = NULL;
H
Hongze Cheng 已提交
468 469
    }

H
Hongze Cheng 已提交
470 471 472 473 474
    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
      goto _write_block;
    } else {
      continue;
    }
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476
  _write_block:
H
Hongze Cheng 已提交
477
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, pBlockIdx, toDataOnly);
H
Hongze Cheng 已提交
478 479 480
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
481
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
482 483 484 485 486 487 488 489 490
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
491
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) {
H
Hongze Cheng 已提交
492 493
  int32_t     code = 0;
  TSDBROW    *pRow;
H
Hongze Cheng 已提交
494 495
  SBlock      block;
  SBlock     *pBlock = &block;
H
Hongze Cheng 已提交
496
  SBlockData *pBlockData = &pCommitter->nBlockData;
H
Hongze Cheng 已提交
497 498
  int64_t     suid = pIter->pTbData->suid;
  int64_t     uid = pIter->pTbData->uid;
H
Hongze Cheng 已提交
499

H
Hongze Cheng 已提交
500 501 502
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
503
  tBlockReset(pBlock);
H
Hongze Cheng 已提交
504
  tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
505
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
506
  ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0);
H
Hongze Cheng 已提交
507
  while (true) {
H
Hongze Cheng 已提交
508
    if (pRow == NULL) {
H
Hongze Cheng 已提交
509 510 511 512 513 514 515
      if (pBlockData->nRow > 0) {
        goto _write_block;
      } else {
        break;
      }
    }

H
Hongze Cheng 已提交
516
    // update schema
H
Hongze Cheng 已提交
517
    code = tsdbCommitterUpdateRowSchema(pCommitter, suid, uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
518 519
    if (code) goto _err;

H
Hongze Cheng 已提交
520
    // append
H
Hongze Cheng 已提交
521
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
522 523 524 525
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
526 527 528
    // if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
    // crash on CI, use the block following
    if (pRow) {
529 530
      TSDBKEY tmpKey = TSDBROW_KEY(pRow);
      if (tsdbKeyCmprFn(&tmpKey, &toKey) >= 0) {
531 532 533
        pRow = NULL;
      }
    }
H
Hongze Cheng 已提交
534 535

    if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
H
Hongze Cheng 已提交
536
    continue;
H
Hongze Cheng 已提交
537 538

  _write_block:
H
Hongze Cheng 已提交
539
    code = tsdbCommitBlockData(pCommitter, pBlockData, pBlock, &(SBlockIdx){.suid = suid, .uid = uid}, toDataOnly);
H
Hongze Cheng 已提交
540 541 542
    if (code) goto _err;

    tBlockReset(pBlock);
H
Hongze Cheng 已提交
543
    tBlockDataClearData(pBlockData);
H
Hongze Cheng 已提交
544 545 546 547 548
  }

  return code;

_err:
H
Hongze Cheng 已提交
549
  tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
550 551 552
  return code;
}

H
Hongze Cheng 已提交
553
static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, SBlockIdx *pBlockIdx) {
H
Hongze Cheng 已提交
554
  int32_t code = 0;
H
Hongze Cheng 已提交
555
  SBlock  block;
H
Hongze Cheng 已提交
556 557

  if (pBlock->last) {
H
Hongze Cheng 已提交
558
    code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
H
Hongze Cheng 已提交
559 560
    if (code) goto _err;

H
Hongze Cheng 已提交
561 562
    tBlockReset(&block);
    code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
563 564 565 566 567 568 569 570 571 572
    if (code) goto _err;
  } else {
    code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb commit table disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
573 574 575
  return code;
}

H
Hongze Cheng 已提交
576 577
static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
578 579
  SBlockIdx  blockIdx = {.suid = suid, .uid = uid};
  SBlockIdx *pBlockIdx = &blockIdx;
H
Hongze Cheng 已提交
580 581 582 583

  code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
  if (code) goto _err;

H
Hongze Cheng 已提交
584 585 586 587
  if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
588 589 590 591 592 593 594 595

  return code;

_err:
  tsdbError("vgId:%d commit table data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
  int32_t     nRow = 0;
  TSDBROW    *pRow;
  TSDBKEY     key;
  int32_t     c = 0;
  STbDataIter iter = *pIter;

  iter.pRow = NULL;
  while (true) {
    pRow = tsdbTbDataIterGet(pIter);

    if (pRow == NULL) break;
    key = TSDBROW_KEY(pRow);

    c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
    if (c == 0) {
      nRow++;
H
Hongze Cheng 已提交
613
      tsdbTbDataIterNext(pIter);
H
Hongze Cheng 已提交
614 615 616 617 618 619 620 621 622 623 624 625 626 627
    } else if (c > 0) {
      break;
    } else {
      ASSERT(0);
    }
  }

  return nRow;
}

static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->nBlockData;
  SBlockIdx  *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
H
Hongze Cheng 已提交
628
  SBlock      block;
H
Hongze Cheng 已提交
629 630
  TSDBROW    *pRow;

H
Hongze Cheng 已提交
631 632 633
  code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
  if (code) goto _err;

H
Hongze Cheng 已提交
634
  pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
635
  code = tsdbCommitterUpdateRowSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
636 637 638
  if (code) goto _err;
  while (true) {
    if (pRow) break;
H
Hongze Cheng 已提交
639
    code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
640 641 642 643 644
    if (code) goto _err;

    tsdbTbDataIterNext(pIter);
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) {
H
Hongze Cheng 已提交
645 646
      TSDBKEY key = TSDBROW_KEY(pRow);
      int32_t c = tBlockCmprFn(&(SBlock){.minKey = key, .maxKey = key}, pBlock);
H
Hongze Cheng 已提交
647 648

      if (c == 0) {
H
Hongze Cheng 已提交
649 650
        code =
            tsdbCommitterUpdateRowSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
H
Hongze Cheng 已提交
651 652 653 654 655 656 657 658 659
        if (code) goto _err;
      } else if (c > 0) {
        pRow = NULL;
      } else {
        ASSERT(0);
      }
    }
  }

H
Hongze Cheng 已提交
660 661
  block = *pBlock;
  code = tsdbCommitBlockData(pCommitter, pBlockData, &block, pBlockIdx, 0);
H
Hongze Cheng 已提交
662 663 664 665 666 667 668 669 670
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
671 672
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
  int32_t      code = 0;
H
Hongze Cheng 已提交
673 674
  STbDataIter  iter = {0};
  STbDataIter *pIter = &iter;
H
Hongze Cheng 已提交
675
  TSDBROW     *pRow;
H
Hongze Cheng 已提交
676
  int32_t      iBlock;
H
Hongze Cheng 已提交
677
  int32_t      nBlock;
H
Hongze Cheng 已提交
678 679
  int64_t      suid;
  int64_t      uid;
H
Hongze Cheng 已提交
680 681 682 683

  if (pTbData) {
    tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter);
    pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
684 685
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;

H
Hongze Cheng 已提交
686 687
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
688 689 690 691 692 693 694 695 696 697 698
  } else {
    pIter = NULL;
    pRow = NULL;
  }

  if (pBlockIdx) {
    code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
    if (code) goto _err;

    nBlock = pCommitter->oBlockMap.nItem;
    ASSERT(nBlock > 0);
H
Hongze Cheng 已提交
699

H
Hongze Cheng 已提交
700 701
    suid = pBlockIdx->suid;
    uid = pBlockIdx->uid;
H
Hongze Cheng 已提交
702 703 704 705
  } else {
    nBlock = 0;
  }

H
Hongze Cheng 已提交
706
  if (pRow == NULL && nBlock == 0) goto _exit;
H
Hongze Cheng 已提交
707 708

  // start ===========
H
Hongze Cheng 已提交
709
  tMapDataReset(&pCommitter->nBlockMap);
H
Hongze Cheng 已提交
710 711
  SBlock  block;
  SBlock *pBlock = &block;
H
Hongze Cheng 已提交
712

H
Hongze Cheng 已提交
713
  iBlock = 0;
H
Hongze Cheng 已提交
714 715 716 717 718 719
  if (iBlock < nBlock) {
    tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
  } else {
    pBlock = NULL;
  }

H
Hongze Cheng 已提交
720 721 722 723 724
  if (pRow) {
    code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid, pTbData->maxSkmVer);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
725 726
  // merge ===========
  while (true) {
H
Hongze Cheng 已提交
727
    if (pRow == NULL && pBlock == NULL) break;
H
Hongze Cheng 已提交
728

H
Hongze Cheng 已提交
729
    if (pRow && pBlock) {
H
Hongze Cheng 已提交
730
      if (pBlock->last) {
H
Hongze Cheng 已提交
731
        code = tsdbMergeTableData(pCommitter, pIter, pBlock,
H
Hongze Cheng 已提交
732
                                  (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
733 734 735
        if (code) goto _err;

        pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
736
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
737
        iBlock++;
H
Hongze Cheng 已提交
738 739 740 741 742
        if (iBlock < nBlock) {
          tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
        } else {
          pBlock = NULL;
        }
H
Hongze Cheng 已提交
743 744

        ASSERT(pRow == NULL && pBlock == NULL);
H
Hongze Cheng 已提交
745
      } else {
H
Hongze Cheng 已提交
746
        int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock);
H
Hongze Cheng 已提交
747
        if (c > 0) {
H
Hongze Cheng 已提交
748
          // only disk data
H
Hongze Cheng 已提交
749
          code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
750 751
          if (code) goto _err;

H
Hongze Cheng 已提交
752 753 754 755 756 757
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
758
        } else if (c < 0) {
H
Hongze Cheng 已提交
759
          // only memory data
H
Hongze Cheng 已提交
760
          code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1);
H
Hongze Cheng 已提交
761 762 763
          if (code) goto _err;

          pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
764
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
H
Hongze Cheng 已提交
765
        } else {
H
Hongze Cheng 已提交
766
          // merge memory and disk
H
Hongze Cheng 已提交
767 768
          int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
          ASSERT(nOvlp);
H
Hongze Cheng 已提交
769
          if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
H
Hongze Cheng 已提交
770 771
            code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
            if (code) goto _err;
H
Hongze Cheng 已提交
772
          } else {
H
Hongze Cheng 已提交
773 774 775 776 777 778 779 780 781 782
            TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
            int8_t  toDataOnly = 0;

            if (iBlock < nBlock - 1) {
              toDataOnly = 1;

              SBlock nextBlock = {0};
              tBlockReset(&nextBlock);
              tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
              toKey = nextBlock.minKey;
H
Hongze Cheng 已提交
783
            }
H
Hongze Cheng 已提交
784 785 786

            code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
            if (code) goto _err;
H
Hongze Cheng 已提交
787
          }
H
Hongze Cheng 已提交
788 789 790 791 792 793 794 795 796

          pRow = tsdbTbDataIterGet(pIter);
          if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
          iBlock++;
          if (iBlock < nBlock) {
            tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
          } else {
            pBlock = NULL;
          }
H
Hongze Cheng 已提交
797 798 799
        }
      }
    } else if (pBlock) {
H
Hongze Cheng 已提交
800
      code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx);
H
Hongze Cheng 已提交
801 802
      if (code) goto _err;

H
Hongze Cheng 已提交
803 804 805 806 807 808
      iBlock++;
      if (iBlock < nBlock) {
        tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
      } else {
        pBlock = NULL;
      }
H
Hongze Cheng 已提交
809
    } else {
H
Hongze Cheng 已提交
810 811
      code =
          tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
H
Hongze Cheng 已提交
812 813 814
      if (code) goto _err;

      pRow = tsdbTbDataIterGet(pIter);
H
Hongze Cheng 已提交
815 816
      if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
      ASSERT(pRow == NULL);
H
Hongze Cheng 已提交
817 818 819
    }
  }

H
Hongze Cheng 已提交
820 821
  // end =====================
  code = tsdbCommitTableDataEnd(pCommitter, suid, uid);
H
Hongze Cheng 已提交
822 823 824 825 826 827 828 829 830 831 832 833 834 835
  if (code) goto _err;

_exit:
  if (pIter) {
    pRow = tsdbTbDataIterGet(pIter);
    if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
  }
  return code;

_err:
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

  // write blockIdx
  code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
  if (code) goto _err;

  // update file header
  code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
  if (code) goto _err;

  // upsert SDFileSet
  code = tsdbFSStateUpsertDFileSet(pCommitter->pTsdb->fs->nState, tsdbDataFWriterGetWSet(pCommitter->pWriter));
  if (code) goto _err;

  // close and sync
  code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
  if (code) goto _err;

  if (pCommitter->pReader) {
    code = tsdbDataFReaderClose(&pCommitter->pReader);
H
Hongze Cheng 已提交
857
    if (code) goto _err;
H
Hongze Cheng 已提交
858 859 860 861 862 863 864 865 866 867 868
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
869 870 871
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
872 873 874 875 876 877

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

  // commit file data impl
H
Hongze Cheng 已提交
878 879
  int32_t    iTbData = 0;
  int32_t    nTbData = taosArrayGetSize(pMemTable->aTbData);
H
Hongze Cheng 已提交
880
  int32_t    iBlockIdx = 0;
H
Hongze Cheng 已提交
881
  int32_t    nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
882
  STbData   *pTbData;
H
Hongze Cheng 已提交
883
  SBlockIdx *pBlockIdx;
H
Hongze Cheng 已提交
884

H
Hongze Cheng 已提交
885
  ASSERT(nTbData > 0);
H
Hongze Cheng 已提交
886

H
Hongze Cheng 已提交
887
  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
H
Hongze Cheng 已提交
888
  pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
889 890 891
  while (pTbData || pBlockIdx) {
    if (pTbData && pBlockIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
H
Hongze Cheng 已提交
892

H
Hongze Cheng 已提交
893
      if (c == 0) {
H
Hongze Cheng 已提交
894
        goto _commit_table_mem_and_disk;
H
Hongze Cheng 已提交
895
      } else if (c < 0) {
H
Hongze Cheng 已提交
896
        goto _commit_table_mem_data;
H
Hongze Cheng 已提交
897
      } else {
H
Hongze Cheng 已提交
898
        goto _commit_table_disk_data;
H
Hongze Cheng 已提交
899
      }
H
Hongze Cheng 已提交
900
    } else if (pBlockIdx) {
H
Hongze Cheng 已提交
901 902 903 904
      goto _commit_table_disk_data;
    } else {
      goto _commit_table_mem_data;
    }
H
Hongze Cheng 已提交
905

H
Hongze Cheng 已提交
906 907 908 909 910
  _commit_table_mem_data:
    code = tsdbCommitTableData(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
911
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
912
    continue;
H
Hongze Cheng 已提交
913

H
Hongze Cheng 已提交
914 915 916 917 918
  _commit_table_disk_data:
    code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
919
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
920 921 922 923 924 925 926
    continue;

  _commit_table_mem_and_disk:
    code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx);
    if (code) goto _err;

    iBlockIdx++;
H
Hongze Cheng 已提交
927
    pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
H
Hongze Cheng 已提交
928
    iTbData++;
H
Hongze Cheng 已提交
929
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
H
Hongze Cheng 已提交
930
    continue;
H
Hongze Cheng 已提交
931 932
  }

H
Hongze Cheng 已提交
933 934
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
935
  if (code) goto _err;
H
Hongze Cheng 已提交
936 937 938 939

  return code;

_err:
H
Hongze Cheng 已提交
940
  tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
941 942
  tsdbDataFReaderClose(&pCommitter->pReader);
  tsdbDataFWriterClose(&pCommitter->pWriter, 0);
H
Hongze Cheng 已提交
943 944 945
  return code;
}

H
Hongze Cheng 已提交
946 947
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
948
  int32_t code = 0;
H
Hongze Cheng 已提交
949

H
Hongze Cheng 已提交
950 951
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
952

H
Hongze Cheng 已提交
953 954 955 956
  // lock();
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  // unlock();
H
Hongze Cheng 已提交
957

H
Hongze Cheng 已提交
958
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
959
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
960 961 962 963
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
964
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
965

H
Hongze Cheng 已提交
966 967 968 969 970 971 972
  code = tsdbFSBegin(pTsdb->fs);
  if (code) goto _err;

  return code;

_err:
  tsdbError("vgId:%d tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
973 974 975
  return code;
}

H
Hongze Cheng 已提交
976 977 978
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
979
  pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
980 981 982 983 984
  if (pCommitter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
985
  pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
H
Hongze Cheng 已提交
986 987 988 989 990
  if (pCommitter->aBlockIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
991 992
  code = tBlockDataInit(&pCommitter->oBlockData);
  if (code) goto _exit;
H
Hongze Cheng 已提交
993

H
Hongze Cheng 已提交
994
  code = tBlockDataInit(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
995
  if (code) goto _exit;
H
Hongze Cheng 已提交
996 997 998 999 1000 1001

_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1002
  taosArrayDestroy(pCommitter->aBlockIdx);
H
Hongze Cheng 已提交
1003 1004
  tMapDataClear(&pCommitter->oBlockMap);
  tBlockDataClear(&pCommitter->oBlockData);
H
Hongze Cheng 已提交
1005
  taosArrayDestroy(pCommitter->aBlockIdxN);
H
Hongze Cheng 已提交
1006 1007
  tMapDataClear(&pCommitter->nBlockMap);
  tBlockDataClear(&pCommitter->nBlockData);
H
Hongze Cheng 已提交
1008 1009
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
1010 1011
}

H
Hongze Cheng 已提交
1012 1013 1014 1015
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1016

H
Hongze Cheng 已提交
1017
  // check
H
Hongze Cheng 已提交
1018
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
1019

H
Hongze Cheng 已提交
1020 1021
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
1022
  if (code) goto _err;
H
Hongze Cheng 已提交
1023 1024 1025

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
1026 1027 1028 1029 1030
  while (pCommitter->nextKey < TSKEY_MAX) {
    pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
    tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                    &pCommitter->maxKey);
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
1031
    if (code) goto _err;
H
Hongze Cheng 已提交
1032
  }
H
Hongze Cheng 已提交
1033

H
Hongze Cheng 已提交
1034 1035 1036
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
1037 1038 1039
_exit:
  tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
  return code;
H
Hongze Cheng 已提交
1040

H
Hongze Cheng 已提交
1041
_err:
H
Hongze Cheng 已提交
1042
  tsdbCommitDataEnd(pCommitter);
H
Hongze Cheng 已提交
1043 1044 1045
  tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
  return code;
}
H
Hongze Cheng 已提交
1046

H
Hongze Cheng 已提交
1047 1048 1049 1050
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1051

H
Hongze Cheng 已提交
1052 1053
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
1054
  }
H
Hongze Cheng 已提交
1055

H
Hongze Cheng 已提交
1056 1057 1058 1059 1060
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1061

H
Hongze Cheng 已提交
1062
  // impl
H
Hongze Cheng 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
  int32_t  nTbData = taosArrayGetSize(pMemTable->aTbData);
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

  pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1118
  }
H
Hongze Cheng 已提交
1119

H
Hongze Cheng 已提交
1120 1121 1122 1123 1124
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
1125

H
Hongze Cheng 已提交
1126
_exit:
H
Hongze Cheng 已提交
1127
  tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1128 1129 1130
  return code;

_err:
H
Hongze Cheng 已提交
1131
  tsdbError("vgId:%d commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1132
  return code;
H
Hongze Cheng 已提交
1133 1134 1135 1136 1137 1138 1139
}

static int32_t tsdbCommitCache(SCommitter *pCommitter) {
  int32_t code = 0;
  // TODO
  return code;
}
H
Hongze Cheng 已提交
1140 1141

static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

  if (eno == 0) {
    code = tsdbFSCommit(pTsdb->fs);
  } else {
    code = tsdbFSRollback(pTsdb->fs);
  }

  tsdbMemTableDestroy(pMemTable);
  pTsdb->imem = NULL;

  tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  return code;

_err:
  tsdbError("vgId:%d tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1160 1161
  return code;
}