tsdbCommit.c 41.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17 18 19 20 21
typedef struct {
  int64_t   suid;
  int64_t   uid;
  STSchema *pTSchema;
} SSkmInfo;
H
Hongze Cheng 已提交
22

H
Hongze Cheng 已提交
23 24
typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT;

H
Hongze Cheng 已提交
25 26 27
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
28
  EDataIterT  type;
H
Hongze Cheng 已提交
29 30 31 32 33 34
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
35 36 37
      int32_t    iSst;
      SArray    *aSstBlk;
      int32_t    iSstBlk;
H
Hongze Cheng 已提交
38 39
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
40
    };  // sst file data iter
H
Hongze Cheng 已提交
41 42 43
  };
} SDataIter;

H
Hongze Cheng 已提交
44
typedef struct {
H
Hongze Cheng 已提交
45
  STsdb *pTsdb;
H
Hongze Cheng 已提交
46
  /* commit data */
H
Hongze Cheng 已提交
47
  int64_t commitID;
H
Hongze Cheng 已提交
48 49
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
50 51
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
52
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
53
  int8_t  maxLast;
H
Hongze Cheng 已提交
54 55
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
56
  // --------------
H
Hongze Cheng 已提交
57
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
58 59 60
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
61
  // commit file data
H
Hongze Cheng 已提交
62 63
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
64 65 66
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
67
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
68
    SBlockData    bData;
H
Hongze Cheng 已提交
69
  } dReader;
H
Hongze Cheng 已提交
70 71 72
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
73
    SDataIter  dataIter;
H
Hongze Cheng 已提交
74
    SDataIter  aDataIter[TSDB_MAX_SST_FILE];
H
Hongze Cheng 已提交
75
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
76
  };
H
Hongze Cheng 已提交
77 78 79
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
80
    SArray       *aSstBlk;    // SArray<SSstBlk>
H
Hongze Cheng 已提交
81
    SMapData      mBlock;     // SMapData<SDataBlk>
H
Hongze Cheng 已提交
82
    SBlockData    bData;
H
Hongze Cheng 已提交
83
    SBlockData    bDatal;
H
Hongze Cheng 已提交
84 85 86
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
87
  /* commit del */
H
Hongze Cheng 已提交
88 89
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
90 91 92
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
93
} SCommitter;
H
refact  
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97 98 99
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
120

H
refact  
Hongze Cheng 已提交
121
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
122
  int32_t code = 0;
H
Hongze Cheng 已提交
123

124 125
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
126 127
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
128
  if (code) goto _err;
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
  // lock
  code = taosThreadRwlockWrlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  pTsdb->mem = pMemTable;

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
146 147 148
  return code;

_err:
S
Shengliang Guan 已提交
149
  tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
150
  return code;
H
Hongze Cheng 已提交
151 152
}

H
more  
Hongze Cheng 已提交
153
int32_t tsdbCommit(STsdb *pTsdb) {
154
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
155

H
more  
Hongze Cheng 已提交
156
  int32_t    code = 0;
H
Hongze Cheng 已提交
157 158 159 160
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
161
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
162
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
163
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
164 165 166
    taosThreadRwlockUnlock(&pTsdb->rwLock);

    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
167 168
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
169

H
more  
Hongze Cheng 已提交
170
  // start commit
H
more  
Hongze Cheng 已提交
171
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
172
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
173

H
refact  
Hongze Cheng 已提交
174 175
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
176
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
177 178

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
179
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
180 181

  // end commit
H
more  
Hongze Cheng 已提交
182
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
183
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185
_exit:
H
refact  
Hongze Cheng 已提交
186 187 188
  return code;

_err:
H
Hongze Cheng 已提交
189
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
190
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
191 192 193
  return code;
}

H
Hongze Cheng 已提交
194
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
195 196 197 198
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
218
  if (pDelFileR) {
H
Hongze Cheng 已提交
219
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
220
    if (code) goto _err;
H
Hongze Cheng 已提交
221

H
Hongze Cheng 已提交
222
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
223
    if (code) goto _err;
H
Hongze Cheng 已提交
224 225
  }

H
Hongze Cheng 已提交
226
  // prepare new
H
Hongze Cheng 已提交
227 228
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
229
  if (code) goto _err;
H
Hongze Cheng 已提交
230 231

_exit:
S
Shengliang Guan 已提交
232
  tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
233 234 235
  return code;

_err:
S
Shengliang Guan 已提交
236
  tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
237 238 239
  return code;
}

H
Hongze Cheng 已提交
240
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
241
  int32_t   code = 0;
H
Hongze Cheng 已提交
242
  SDelData *pDelData;
H
Hongze Cheng 已提交
243 244
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
245 246

  if (pTbData) {
H
Hongze Cheng 已提交
247 248
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
249

H
Hongze Cheng 已提交
250 251 252 253
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
254 255

  if (pDelIdx) {
H
Hongze Cheng 已提交
256 257 258
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
259
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
260
    if (code) goto _err;
261 262
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
263 264
  }

H
Hongze Cheng 已提交
265
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
266

H
Hongze Cheng 已提交
267
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
268 269

  // memory
H
Hongze Cheng 已提交
270 271
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
272 273 274 275
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
276 277 278
  }

  // write
H
Hongze Cheng 已提交
279
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
280 281 282
  if (code) goto _err;

  // put delIdx
283
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
284 285 286
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
287 288 289 290 291

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
292
  tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
293 294 295
  return code;
}

H
Hongze Cheng 已提交
296 297
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
298
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
299

H
Hongze Cheng 已提交
300
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
301
  if (code) goto _err;
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303 304 305
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
306
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
307
  if (code) goto _err;
H
Hongze Cheng 已提交
308

H
Hongze Cheng 已提交
309
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
310
  if (code) goto _err;
H
Hongze Cheng 已提交
311 312

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
313
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
314 315 316
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
317 318 319 320
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
321 322 323
  return code;

_err:
S
Shengliang Guan 已提交
324
  tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
325 326 327
  return code;
}

H
Hongze Cheng 已提交
328
static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) {
H
Hongze Cheng 已提交
329 330
  int32_t code = 0;

H
Hongze Cheng 已提交
331
  if (suid) {
H
Hongze Cheng 已提交
332 333 334 335
    if (pCommitter->skmTable.suid == suid) {
      pCommitter->skmTable.uid = uid;
      goto _exit;
    }
H
Hongze Cheng 已提交
336 337
  } else {
    if (pCommitter->skmTable.uid == uid) goto _exit;
H
Hongze Cheng 已提交
338 339 340 341 342
  }

  pCommitter->skmTable.suid = suid;
  pCommitter->skmTable.uid = uid;
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
343
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
  if (code) goto _exit;

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
375 376 377 378 379 380 381 382 383 384
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
385
    code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
386 387 388 389 390 391 392 393 394 395 396
    if (code) goto _exit;

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
397 398 399 400
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
401
  tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
H
Hongze Cheng 已提交
402 403

  // memory
H
Hongze Cheng 已提交
404
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
405 406
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
407 408 409 410 411 412 413
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
414
      pRow = NULL;
H
Hongze Cheng 已提交
415 416
    }

H
Hongze Cheng 已提交
417 418
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
419 420 421 422 423
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
424
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
425 426 427
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
428
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
429
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
430
  if (pReader) {
H
Hongze Cheng 已提交
431
    if (pReader->pSet->nSstF >= pCommitter->maxLast) {
H
Hongze Cheng 已提交
432
      int8_t iIter = 0;
H
Hongze Cheng 已提交
433
      for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
H
Hongze Cheng 已提交
434 435
        pIter = &pCommitter->aDataIter[iIter];
        pIter->type = LAST_DATA_ITER;
H
Hongze Cheng 已提交
436
        pIter->iSst = iSst;
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438
        code = tsdbReadSstBlk(pCommitter->dReader.pReader, iSst, pIter->aSstBlk);
H
Hongze Cheng 已提交
439 440
        if (code) goto _err;

H
Hongze Cheng 已提交
441
        if (taosArrayGetSize(pIter->aSstBlk) == 0) continue;
H
Hongze Cheng 已提交
442

H
Hongze Cheng 已提交
443 444
        pIter->iSstBlk = 0;
        SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0);
H
Hongze Cheng 已提交
445
        code = tsdbReadSstBlock(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData);
H
Hongze Cheng 已提交
446 447 448 449 450 451 452 453 454 455
        if (code) goto _err;

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
456
    } else {
H
Hongze Cheng 已提交
457 458
      for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
        SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
H
Hongze Cheng 已提交
459
        if (pSstFile->size > pSstFile->offset) {
H
Hongze Cheng 已提交
460 461 462 463
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
464
    }
H
Hongze Cheng 已提交
465 466 467 468 469 470 471 472 473 474 475
  }

  code = tsdbNextCommitRow(pCommitter);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
476 477 478 479
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
480

H
Hongze Cheng 已提交
481
  // memory
H
Hongze Cheng 已提交
482 483 484
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
485
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
486

H
Hongze Cheng 已提交
487
  // Reader
H
Hongze Cheng 已提交
488 489
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
490
  if (pRSet) {
H
Hongze Cheng 已提交
491
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
492 493
    if (code) goto _err;

H
Hongze Cheng 已提交
494
    // data
H
Hongze Cheng 已提交
495
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
496
    if (code) goto _err;
H
Hongze Cheng 已提交
497

H
Hongze Cheng 已提交
498
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
499 500
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
501
      code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
502 503 504 505
      if (code) goto _err;
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
506
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
507
  } else {
H
Hongze Cheng 已提交
508
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
509
  }
H
Hongze Cheng 已提交
510

H
Hongze Cheng 已提交
511
  // Writer
H
Hongze Cheng 已提交
512 513 514
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
515
  SSstFile  fSst = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
516
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
517
  if (pRSet) {
H
Hongze Cheng 已提交
518
    ASSERT(pRSet->nSstF <= pCommitter->maxLast);
H
Hongze Cheng 已提交
519 520
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
521
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
522
    if (pRSet->nSstF < pCommitter->maxLast) {
H
Hongze Cheng 已提交
523 524
      for (int32_t iSst = 0; iSst < pRSet->nSstF; iSst++) {
        wSet.aSstF[iSst] = pRSet->aSstF[iSst];
H
Hongze Cheng 已提交
525
      }
H
Hongze Cheng 已提交
526
      wSet.nSstF = pRSet->nSstF + 1;
H
Hongze Cheng 已提交
527
    } else {
H
Hongze Cheng 已提交
528
      wSet.nSstF = 1;
H
Hongze Cheng 已提交
529
    }
H
Hongze Cheng 已提交
530
  } else {
H
Hongze Cheng 已提交
531
    SDiskID did = {0};
532
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
H
Hongze Cheng 已提交
533
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
534
    wSet.diskId = did;
H
Hongze Cheng 已提交
535
    wSet.nSstF = 1;
H
Hongze Cheng 已提交
536
  }
H
Hongze Cheng 已提交
537
  wSet.aSstF[wSet.nSstF - 1] = &fSst;
H
Hongze Cheng 已提交
538
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
539
  if (code) goto _err;
H
Hongze Cheng 已提交
540

H
Hongze Cheng 已提交
541
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
542
  taosArrayClear(pCommitter->dWriter.aSstBlk);
H
Hongze Cheng 已提交
543 544 545 546
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
  tBlockDataReset(&pCommitter->dWriter.bDatal);

H
Hongze Cheng 已提交
547 548 549 550
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
551
_exit:
H
Hongze Cheng 已提交
552 553 554
  return code;

_err:
S
Shengliang Guan 已提交
555
  tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
556
  return code;
H
Hongze Cheng 已提交
557 558
}

H
Hongze Cheng 已提交
559
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
560 561
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
562
  SDataBlk    block;
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
565

H
Hongze Cheng 已提交
566
  tDataBlkReset(&block);
H
Hongze Cheng 已提交
567

H
Hongze Cheng 已提交
568
  // info
H
Hongze Cheng 已提交
569 570 571
  block.nRow += pBlockData->nRow;
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
    if (iRow == 0) {
      if (tsdbKeyCmprFn(&block.minKey, &key) > 0) {
        block.minKey = key;
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
        block.hasDup = 1;
      }
    }

    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) {
      block.maxKey = key;
    }

    block.minVer = TMIN(block.minVer, key.version);
    block.maxVer = TMAX(block.maxVer, key.version);
  }

  // write
H
Hongze Cheng 已提交
592 593
  block.nSubBlock++;
  code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1],
H
Hongze Cheng 已提交
594
                            ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0);
H
Hongze Cheng 已提交
595 596
  if (code) goto _err;

H
Hongze Cheng 已提交
597 598
  // put SDataBlk
  code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk);
H
Hongze Cheng 已提交
599
  if (code) goto _err;
H
Hongze Cheng 已提交
600

H
Hongze Cheng 已提交
601
  // clear
H
Hongze Cheng 已提交
602
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
603

H
Hongze Cheng 已提交
604 605 606 607
  return code;

_err:
  tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
608 609 610 611
  return code;
}

static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
612
  int32_t     code = 0;
H
Hongze Cheng 已提交
613
  SSstBlk     blockL;
H
Hongze Cheng 已提交
614 615 616 617
  SBlockData *pBlockData = &pCommitter->dWriter.bDatal;

  ASSERT(pBlockData->nRow > 0);

H
Hongze Cheng 已提交
618
  // info
H
Hongze Cheng 已提交
619 620
  blockL.suid = pBlockData->suid;
  blockL.nRow = pBlockData->nRow;
H
Hongze Cheng 已提交
621 622
  blockL.minKey = TSKEY_MAX;
  blockL.maxKey = TSKEY_MIN;
H
Hongze Cheng 已提交
623 624 625
  blockL.minVer = VERSION_MAX;
  blockL.maxVer = VERSION_MIN;
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
626 627
    blockL.minKey = TMIN(blockL.minKey, pBlockData->aTSKEY[iRow]);
    blockL.maxKey = TMAX(blockL.maxKey, pBlockData->aTSKEY[iRow]);
H
Hongze Cheng 已提交
628
    blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
629
    blockL.maxVer = TMAX(blockL.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
630 631 632
  }
  blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634
  // write
H
Hongze Cheng 已提交
635
  code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
H
Hongze Cheng 已提交
636
  if (code) goto _err;
H
Hongze Cheng 已提交
637

H
Hongze Cheng 已提交
638 639
  // push SSstBlk
  if (taosArrayPush(pCommitter->dWriter.aSstBlk, &blockL) == NULL) {
H
Hongze Cheng 已提交
640
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
641
    goto _err;
H
Hongze Cheng 已提交
642 643
  }

H
Hongze Cheng 已提交
644
  // clear
H
Hongze Cheng 已提交
645
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647 648 649 650
  return code;

_err:
  tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
651 652 653
  return code;
}

H
Hongze Cheng 已提交
654 655 656
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
657
  // write aBlockIdx
H
Hongze Cheng 已提交
658
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
659 660
  if (code) goto _err;

H
Hongze Cheng 已提交
661 662
  // write aSstBlk
  code = tsdbWriteSstBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSstBlk);
H
Hongze Cheng 已提交
663 664
  if (code) goto _err;

H
Hongze Cheng 已提交
665
  // update file header
H
Hongze Cheng 已提交
666
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
667 668 669
  if (code) goto _err;

  // upsert SDFileSet
H
Hongze Cheng 已提交
670
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
671 672 673
  if (code) goto _err;

  // close and sync
H
Hongze Cheng 已提交
674
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
675 676
  if (code) goto _err;

H
Hongze Cheng 已提交
677 678
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
679
    if (code) goto _err;
H
Hongze Cheng 已提交
680 681 682 683 684 685
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
686
  tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
687 688 689
  return code;
}

H
Hongze Cheng 已提交
690 691 692
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;

H
Hongze Cheng 已提交
693
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
694
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
695
    code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
696 697 698 699 700 701 702
    if (code) goto _err;

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }

H
Hongze Cheng 已提交
703 704
    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
705 706 707 708 709 710 711 712 713
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
714
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
715
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
716 717 718
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
719 720 721 722 723

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
724 725 726
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
  if (code) goto _err;
H
Hongze Cheng 已提交
727

H
Hongze Cheng 已提交
728 729
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
730
  if (code) goto _err;
H
Hongze Cheng 已提交
731 732 733 734

  return code;

_err:
S
Shengliang Guan 已提交
735
  tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
736 737
  tsdbDataFReaderClose(&pCommitter->dReader.pReader);
  tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
H
Hongze Cheng 已提交
738 739 740
  return code;
}

H
Hongze Cheng 已提交
741 742
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
743
  int32_t code = 0;
H
Hongze Cheng 已提交
744

H
Hongze Cheng 已提交
745 746
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
747

H
more  
Hongze Cheng 已提交
748
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
749 750
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
more  
Hongze Cheng 已提交
751
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
752

H
Hongze Cheng 已提交
753
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
754
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
755 756 757 758
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
759
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
760
  pCommitter->maxLast = TSDB_DEFAULT_SST_FILE;  // TODO: make it as a config
H
Hongze Cheng 已提交
761 762 763 764 765
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
766
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
767 768 769 770 771
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
772
  tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
773 774 775
  return code;
}

H
Hongze Cheng 已提交
776 777 778
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
779
  // reader
H
Hongze Cheng 已提交
780 781
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
782 783 784 785
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
786
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
787 788
  if (code) goto _exit;

H
Hongze Cheng 已提交
789
  // merger
H
Hongze Cheng 已提交
790
  for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
H
Hongze Cheng 已提交
791 792 793
    SDataIter *pIter = &pCommitter->aDataIter[iSst];
    pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
    if (pIter->aSstBlk == NULL) {
H
Hongze Cheng 已提交
794 795 796 797 798 799 800 801 802
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    code = tBlockDataCreate(&pIter->bData);
    if (code) goto _exit;
  }

  // writer
H
Hongze Cheng 已提交
803 804 805 806 807 808
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
809 810
  pCommitter->dWriter.aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
  if (pCommitter->dWriter.aSstBlk == NULL) {
H
Hongze Cheng 已提交
811 812 813 814
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
815
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
816
  if (code) goto _exit;
H
Hongze Cheng 已提交
817

H
Hongze Cheng 已提交
818
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
H
Hongze Cheng 已提交
819 820
  if (code) goto _exit;

H
Hongze Cheng 已提交
821 822 823 824 825
_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
826
  // reader
H
Hongze Cheng 已提交
827 828
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
829
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
830

H
Hongze Cheng 已提交
831
  // merger
H
Hongze Cheng 已提交
832
  for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) {
H
Hongze Cheng 已提交
833 834
    SDataIter *pIter = &pCommitter->aDataIter[iSst];
    taosArrayDestroy(pIter->aSstBlk);
H
Hongze Cheng 已提交
835 836 837 838
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
839
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
840
  taosArrayDestroy(pCommitter->dWriter.aSstBlk);
H
Hongze Cheng 已提交
841
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
842 843
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
H
Hongze Cheng 已提交
844 845
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
846 847
}

H
Hongze Cheng 已提交
848 849 850 851
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
852

H
Hongze Cheng 已提交
853
  // check
H
Hongze Cheng 已提交
854
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
855

H
Hongze Cheng 已提交
856 857
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
858
  if (code) goto _err;
H
Hongze Cheng 已提交
859 860 861

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
862 863
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
864
    if (code) goto _err;
H
Hongze Cheng 已提交
865
  }
H
Hongze Cheng 已提交
866

H
Hongze Cheng 已提交
867 868 869
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
870
_exit:
H
Hongze Cheng 已提交
871
  tsdbInfo("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
H
Hongze Cheng 已提交
872
  return code;
H
Hongze Cheng 已提交
873

H
Hongze Cheng 已提交
874
_err:
H
Hongze Cheng 已提交
875
  tsdbCommitDataEnd(pCommitter);
S
Shengliang Guan 已提交
876
  tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
877 878
  return code;
}
H
Hongze Cheng 已提交
879

H
Hongze Cheng 已提交
880 881 882 883
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
884

H
Hongze Cheng 已提交
885 886
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
887
  }
H
Hongze Cheng 已提交
888

H
Hongze Cheng 已提交
889 890 891 892 893
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
894

H
Hongze Cheng 已提交
895
  // impl
H
Hongze Cheng 已提交
896 897 898
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
899
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
900 901 902 903 904
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
905
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
931
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
947
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
948 949 950
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
951
  }
H
Hongze Cheng 已提交
952

H
Hongze Cheng 已提交
953 954 955 956 957
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
958

H
Hongze Cheng 已提交
959
_exit:
S
Shengliang Guan 已提交
960
  tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
961 962 963
  return code;

_err:
S
Shengliang Guan 已提交
964
  tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
965
  return code;
H
Hongze Cheng 已提交
966 967
}

H
Hongze Cheng 已提交
968
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
969 970 971 972
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
973
  ASSERT(eno == 0);
H
Hongze Cheng 已提交
974 975 976

  code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
  if (code) goto _err;
H
Hongze Cheng 已提交
977

H
Hongze Cheng 已提交
978
  // lock
H
Hongze Cheng 已提交
979
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
980 981 982 983 984 985 986 987

  // commit or rollback
  code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

H
Hongze Cheng 已提交
988
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
989 990

  // unlock
H
Hongze Cheng 已提交
991 992 993
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
994
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
995
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
996

H
Hongze Cheng 已提交
997 998 999 1000
  // if (pCommitter->toMerge) {
  //   code = tsdbMerge(pTsdb);
  //   if (code) goto _err;
  // }
H
Hongze Cheng 已提交
1001

S
Shengliang Guan 已提交
1002
  tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1003 1004 1005
  return code;

_err:
S
Shengliang Guan 已提交
1006
  tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1007 1008
  return code;
}
H
Hongze Cheng 已提交
1009

H
Hongze Cheng 已提交
1010
// ================================================================================
H
Hongze Cheng 已提交
1011

H
Hongze Cheng 已提交
1012 1013
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1014 1015
}

H
Hongze Cheng 已提交
1016
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1017 1018 1019 1020
  int32_t code = 0;

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1021
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1038 1039
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1049
    } else if (pCommitter->pIter->type == LAST_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1050 1051 1052 1053 1054
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1055 1056 1057
        pIter->iSstBlk++;
        if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) {
          SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
H
Hongze Cheng 已提交
1058

H
Hongze Cheng 已提交
1059
          code = tsdbReadSstBlock(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData);
H
Hongze Cheng 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
1098
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    ASSERT(pRowInfo->row.type == 0);
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
    if (code) goto _err;

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
    if (code) goto _err;

    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1122
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1123 1124 1125
      }
    }

H
Hongze Cheng 已提交
1126 1127
    if (pBlockData->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1128 1129 1130 1131 1132
      if (code) goto _err;
    }
  }

  if (pBlockData->nRow) {
H
Hongze Cheng 已提交
1133
    code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1134 1135 1136 1137 1138 1139 1140
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1141 1142 1143
  return code;
}

H
Hongze Cheng 已提交
1144
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1145 1146 1147 1148 1149 1150
  int32_t     code = 0;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1151
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
  if (code) goto _err;

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
      if (code) goto _err;

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      ASSERT(pRowInfo->row.type == 0);
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
      if (code) goto _err;

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
      if (code) goto _err;

      code = tsdbNextCommitRow(pCommitter);
      if (code) goto _err;

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1188
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1189 1190 1191 1192 1193 1194
        }
      }
    } else {
      ASSERT(0);
    }

H
Hongze Cheng 已提交
1195 1196
    if (pBDataW->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
      if (code) goto _err;
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
    if (code) goto _err;

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1212 1213
    if (pBDataW->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1214 1215 1216 1217 1218
      if (code) goto _err;
    }
  }

  if (pBDataW->nRow) {
H
Hongze Cheng 已提交
1219
    code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1220 1221 1222 1223 1224 1225 1226
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1227 1228 1229
  return code;
}

H
Hongze Cheng 已提交
1230 1231 1232 1233 1234 1235 1236
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
  int32_t    code = 0;
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1237 1238
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1239 1240 1241 1242
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1243 1244 1245
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1246
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1247 1248

      if (c < 0) {
H
Hongze Cheng 已提交
1249
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1250 1251 1252 1253
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1254
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1255
        } else {
H
Hongze Cheng 已提交
1256
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1257 1258
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1259
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1260 1261 1262 1263
        if (code) goto _err;

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1264
      } else {
H
Hongze Cheng 已提交
1265
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1266 1267 1268 1269
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1270
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1271
        } else {
H
Hongze Cheng 已提交
1272
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1273 1274 1275
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1276 1277 1278
      }
    }

H
Hongze Cheng 已提交
1279 1280
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1281 1282 1283 1284
      if (code) goto _err;

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1285
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1286
      } else {
H
Hongze Cheng 已提交
1287
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1288 1289
      }
    }
H
Hongze Cheng 已提交
1290 1291 1292

    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
1303
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1304 1305 1306 1307
  int32_t code = 0;

  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
  if (pBDatal->suid || pBDatal->uid) {
H
Hongze Cheng 已提交
1308
    if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
H
Hongze Cheng 已提交
1309 1310
      if (pBDatal->nRow) {
        code = tsdbCommitLastBlock(pCommitter);
H
Hongze Cheng 已提交
1311
        if (code) goto _exit;
H
Hongze Cheng 已提交
1312 1313 1314 1315 1316 1317
      }
      tBlockDataReset(pBDatal);
    }
  }

  if (!pBDatal->suid && !pBDatal->uid) {
H
Hongze Cheng 已提交
1318 1319
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1320
    code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1321
    if (code) goto _exit;
H
Hongze Cheng 已提交
1322 1323
  }

H
Hongze Cheng 已提交
1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337
_exit:
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;

  SBlockData *pBData = &pCommitter->dWriter.bData;
  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;

  TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
  code = tsdbInitLastBlockIfNeed(pCommitter, id);
  if (code) goto _err;

H
Hongze Cheng 已提交
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
    code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
    if (code) goto _err;

    if (pBDatal->nRow >= pCommitter->maxRow) {
      code = tsdbCommitLastBlock(pCommitter);
      if (code) goto _err;
    }
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
1355
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1356
  int32_t code = 0;
H
Hongze Cheng 已提交
1357

H
Hongze Cheng 已提交
1358
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1359 1360 1361 1362
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1363
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1364

H
Hongze Cheng 已提交
1365
  SBlockData *pBData;
H
Hongze Cheng 已提交
1366
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1367
    pBData = &pCommitter->dWriter.bDatal;
H
Hongze Cheng 已提交
1368 1369
    code = tsdbInitLastBlockIfNeed(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1370
  } else {
H
Hongze Cheng 已提交
1371
    pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1372
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1373
  }
H
Hongze Cheng 已提交
1374

H
Hongze Cheng 已提交
1375 1376 1377 1378
  while (pRowInfo) {
    STSchema *pTSchema = NULL;
    if (pRowInfo->row.type == 0) {
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1379
      if (code) goto _err;
H
Hongze Cheng 已提交
1380 1381
      pTSchema = pCommitter->skmRow.pTSchema;
    }
H
Hongze Cheng 已提交
1382

H
Hongze Cheng 已提交
1383 1384
    code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
    if (code) goto _err;
H
Hongze Cheng 已提交
1385

H
Hongze Cheng 已提交
1386 1387
    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1388

H
Hongze Cheng 已提交
1389 1390 1391
    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
      pRowInfo = NULL;
H
Hongze Cheng 已提交
1392 1393
    }

H
Hongze Cheng 已提交
1394 1395 1396
    if (pBData->nRow >= pCommitter->maxRow) {
      if (pCommitter->toLastOnly) {
        code = tsdbCommitLastBlock(pCommitter);
H
Hongze Cheng 已提交
1397 1398
        if (code) goto _err;
      } else {
H
Hongze Cheng 已提交
1399
        code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1400 1401
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
1402 1403 1404
    }
  }

H
Hongze Cheng 已提交
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
  if (!pCommitter->toLastOnly && pBData->nRow) {
    if (pBData->nRow > pCommitter->minRow) {
      code = tsdbCommitDataBlock(pCommitter);
      if (code) goto _err;
    } else {
      code = tsdbAppendLastBlock(pCommitter);
      if (code) goto _err;
    }
  }

H
Hongze Cheng 已提交
1415
_exit:
H
Hongze Cheng 已提交
1416 1417 1418
  return code;

_err:
H
Hongze Cheng 已提交
1419
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1420 1421 1422
  return code;
}

H
Hongze Cheng 已提交
1423 1424 1425
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
1426
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1427
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1428
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1429 1430 1431
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1432

H
Hongze Cheng 已提交
1433 1434
    code = tsdbMoveCommitData(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1435 1436

    // start
H
Hongze Cheng 已提交
1437
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1438 1439

    // impl
H
Hongze Cheng 已提交
1440 1441
    code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
    if (code) goto _err;
H
Hongze Cheng 已提交
1442
    code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1443
    if (code) goto _err;
H
Hongze Cheng 已提交
1444
    code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1445
    if (code) goto _err;
H
Hongze Cheng 已提交
1446

H
Hongze Cheng 已提交
1447 1448
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1449 1450
    if (code) goto _err;

H
Hongze Cheng 已提交
1451
    /* handle remain table data */
H
Hongze Cheng 已提交
1452
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1453
    if (code) goto _err;
H
Hongze Cheng 已提交
1454

H
Hongze Cheng 已提交
1455
    // end
H
Hongze Cheng 已提交
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
      code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
      if (code) goto _err;

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1466 1467
  }

H
Hongze Cheng 已提交
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
  if (code) goto _err;

  if (pCommitter->dWriter.bDatal.nRow > 0) {
    code = tsdbCommitLastBlock(pCommitter);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
1478 1479 1480 1481 1482 1483
  return code;

_err:
  tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}