tsdbCommit.c 42.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
Hongze Cheng 已提交
20 21 22
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
23
  EDataIterT  type;
H
Hongze Cheng 已提交
24 25 26 27 28 29
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
30 31 32
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
33 34
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
35
    };  // stt file data iter
H
Hongze Cheng 已提交
36 37 38
  };
} SDataIter;

H
Hongze Cheng 已提交
39
typedef struct {
H
Hongze Cheng 已提交
40
  STsdb *pTsdb;
H
Hongze Cheng 已提交
41
  /* commit data */
H
Hongze Cheng 已提交
42
  int64_t commitID;
H
Hongze Cheng 已提交
43 44
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
45 46
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
47
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
48
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
49 50
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
51
  // --------------
H
Hongze Cheng 已提交
52
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
53 54 55
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
56
  // commit file data
H
Hongze Cheng 已提交
57 58
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
59 60 61
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
62
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
63
    SBlockData    bData;
H
Hongze Cheng 已提交
64
  } dReader;
H
Hongze Cheng 已提交
65 66 67
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
68
    SDataIter  dataIter;
H
Hongze Cheng 已提交
69
    SDataIter  aDataIter[TSDB_MAX_STT_TRIGGER];
H
Hongze Cheng 已提交
70
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
71
  };
H
Hongze Cheng 已提交
72 73 74
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
75
    SArray       *aSttBlk;    // SArray<SSttBlk>
H
Hongze Cheng 已提交
76
    SMapData      mBlock;     // SMapData<SDataBlk>
H
Hongze Cheng 已提交
77
    SBlockData    bData;
H
Hongze Cheng 已提交
78
    SBlockData    bDatal;
H
Hongze Cheng 已提交
79 80 81
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
82
  /* commit del */
H
Hongze Cheng 已提交
83 84
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
85 86 87
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
88
} SCommitter;
H
refact  
Hongze Cheng 已提交
89

H
Hongze Cheng 已提交
90 91 92 93 94
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
95 96
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
97
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
115

H
refact  
Hongze Cheng 已提交
116
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
117
  int32_t code = 0;
H
Hongze Cheng 已提交
118

119 120
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
121 122
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
123
  if (code) goto _err;
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  // lock
  code = taosThreadRwlockWrlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  pTsdb->mem = pMemTable;

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
141 142 143
  return code;

_err:
S
Shengliang Guan 已提交
144
  tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
145
  return code;
H
Hongze Cheng 已提交
146 147
}

H
more  
Hongze Cheng 已提交
148
int32_t tsdbCommit(STsdb *pTsdb) {
149
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
150

H
more  
Hongze Cheng 已提交
151
  int32_t    code = 0;
H
Hongze Cheng 已提交
152 153 154 155
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
156
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
157
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
158
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
159 160
    taosThreadRwlockUnlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
161
    tsdbUnrefMemTable(pMemTable, NULL);
H
Hongze Cheng 已提交
162 163
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
164

H
more  
Hongze Cheng 已提交
165
  // start commit
H
more  
Hongze Cheng 已提交
166
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
167
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
168

H
refact  
Hongze Cheng 已提交
169 170
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
171
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
172 173

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
174
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
175 176

  // end commit
H
more  
Hongze Cheng 已提交
177
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
178
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180
_exit:
H
refact  
Hongze Cheng 已提交
181 182 183
  return code;

_err:
H
Hongze Cheng 已提交
184
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
185
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
186 187 188
  return code;
}

H
Hongze Cheng 已提交
189
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
190 191 192 193
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
213
  if (pDelFileR) {
H
Hongze Cheng 已提交
214
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
215
    if (code) goto _err;
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
218
    if (code) goto _err;
H
Hongze Cheng 已提交
219 220
  }

H
Hongze Cheng 已提交
221
  // prepare new
H
Hongze Cheng 已提交
222 223
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
224
  if (code) goto _err;
H
Hongze Cheng 已提交
225 226

_exit:
S
Shengliang Guan 已提交
227
  tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
228 229 230
  return code;

_err:
S
Shengliang Guan 已提交
231
  tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
232 233 234
  return code;
}

H
Hongze Cheng 已提交
235
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
236
  int32_t   code = 0;
H
Hongze Cheng 已提交
237
  SDelData *pDelData;
H
Hongze Cheng 已提交
238 239
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
240 241

  if (pTbData) {
H
Hongze Cheng 已提交
242 243
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245 246 247 248
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
249 250

  if (pDelIdx) {
H
Hongze Cheng 已提交
251 252 253
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
254
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
255
    if (code) goto _err;
256 257
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
258 259
  }

H
Hongze Cheng 已提交
260
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
263 264

  // memory
H
Hongze Cheng 已提交
265 266
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
267 268 269 270
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
271 272 273
  }

  // write
H
Hongze Cheng 已提交
274
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
275 276 277
  if (code) goto _err;

  // put delIdx
278
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
279 280 281
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
282 283 284 285 286

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
287
  tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
288 289 290
  return code;
}

H
Hongze Cheng 已提交
291 292
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
293
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
296
  if (code) goto _err;
H
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298 299 300
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
301
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
302
  if (code) goto _err;
H
Hongze Cheng 已提交
303

H
Hongze Cheng 已提交
304
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
305
  if (code) goto _err;
H
Hongze Cheng 已提交
306 307

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
308
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
309 310 311
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
312 313 314 315
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
316 317 318
  return code;

_err:
S
Shengliang Guan 已提交
319
  tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
320 321 322
  return code;
}

H
Hongze Cheng 已提交
323
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
324 325
  int32_t code = 0;

H
Hongze Cheng 已提交
326
  if (suid) {
H
Hongze Cheng 已提交
327 328
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
329 330
      goto _exit;
    }
H
Hongze Cheng 已提交
331
  } else {
H
Hongze Cheng 已提交
332
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
333 334
  }

H
Hongze Cheng 已提交
335 336 337 338
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
  tTSchemaDestroy(pSkmInfo->pTSchema);
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
  if (code) goto _exit;

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
370 371 372 373 374 375 376 377 378 379
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
380
    code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
381 382 383 384 385 386 387 388 389 390 391
    if (code) goto _exit;

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
392 393 394 395
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
396
  tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
H
Hongze Cheng 已提交
397 398

  // memory
H
Hongze Cheng 已提交
399
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
400 401
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
402 403 404 405 406 407 408
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
409
      pRow = NULL;
H
Hongze Cheng 已提交
410 411
    }

H
Hongze Cheng 已提交
412 413
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
414 415 416 417 418
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
419
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
420 421 422
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
423
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
424
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
425
  if (pReader) {
H
Hongze Cheng 已提交
426
    if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
427
      int8_t iIter = 0;
H
Hongze Cheng 已提交
428
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
429
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
430
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
431
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
432

H
Hongze Cheng 已提交
433
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
434 435
        if (code) goto _err;

H
Hongze Cheng 已提交
436
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438 439 440
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
        code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
441 442 443 444 445 446 447 448 449 450
        if (code) goto _err;

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
451
    } else {
H
Hongze Cheng 已提交
452 453 454
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
455 456 457 458
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
459
    }
H
Hongze Cheng 已提交
460 461 462 463 464 465 466 467 468 469 470
  }

  code = tsdbNextCommitRow(pCommitter);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
471 472 473 474
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476
  // memory
H
Hongze Cheng 已提交
477 478 479
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
480
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482
  // Reader
H
Hongze Cheng 已提交
483 484
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
485
  if (pRSet) {
H
Hongze Cheng 已提交
486
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
487 488
    if (code) goto _err;

H
Hongze Cheng 已提交
489
    // data
H
Hongze Cheng 已提交
490
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
491
    if (code) goto _err;
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
494 495
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
496
      code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
497 498 499 500
      if (code) goto _err;
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
501
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
502
  } else {
H
Hongze Cheng 已提交
503
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
504
  }
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506
  // Writer
H
Hongze Cheng 已提交
507 508 509
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
510
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
511
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
512
  if (pRSet) {
H
Hongze Cheng 已提交
513
    ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
H
Hongze Cheng 已提交
514 515
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
516
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
517
    if (pRSet->nSttF < pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
518 519
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
520
      }
H
Hongze Cheng 已提交
521
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
522
    } else {
H
Hongze Cheng 已提交
523
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
524
    }
H
Hongze Cheng 已提交
525
  } else {
H
Hongze Cheng 已提交
526
    SDiskID did = {0};
527
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
H
Hongze Cheng 已提交
528
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
529
    wSet.diskId = did;
H
Hongze Cheng 已提交
530
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
531
  }
H
Hongze Cheng 已提交
532
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
533
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
534
  if (code) goto _err;
H
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
537
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
538 539 540 541
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
  tBlockDataReset(&pCommitter->dWriter.bDatal);

H
Hongze Cheng 已提交
542 543 544 545
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
546
_exit:
H
Hongze Cheng 已提交
547 548 549
  return code;

_err:
S
Shengliang Guan 已提交
550
  tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
551
  return code;
H
Hongze Cheng 已提交
552 553
}

H
Hongze Cheng 已提交
554 555
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
556

H
Hongze Cheng 已提交
557
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559
  SDataBlk dataBlk;
H
Hongze Cheng 已提交
560
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
561

H
Hongze Cheng 已提交
562
  // info
H
Hongze Cheng 已提交
563
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
564 565
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567
    if (iRow == 0) {
H
Hongze Cheng 已提交
568 569
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
570 571 572
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
573
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
574 575 576
      }
    }

H
Hongze Cheng 已提交
577 578
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
579 580
    }

H
Hongze Cheng 已提交
581 582
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
583 584 585
  }

  // write
H
Hongze Cheng 已提交
586
  dataBlk.nSubBlock++;
H
Hongze Cheng 已提交
587 588
  code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
H
Hongze Cheng 已提交
589 590
  if (code) goto _err;

H
Hongze Cheng 已提交
591
  // put SDataBlk
H
Hongze Cheng 已提交
592
  code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
593
  if (code) goto _err;
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595
  // clear
H
Hongze Cheng 已提交
596
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598 599 600
  return code;

_err:
H
Hongze Cheng 已提交
601
  tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
602 603 604
  return code;
}

H
Hongze Cheng 已提交
605 606 607
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
  int32_t code = 0;
  SSttBlk sstBlk;
H
Hongze Cheng 已提交
608

H
Hongze Cheng 已提交
609
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
610

H
Hongze Cheng 已提交
611
  // info
H
Hongze Cheng 已提交
612 613 614 615 616 617
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
618
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
619 620 621 622
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
623
  }
H
Hongze Cheng 已提交
624 625
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627
  // write
H
Hongze Cheng 已提交
628
  code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
H
Hongze Cheng 已提交
629
  if (code) goto _err;
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631
  // push SSttBlk
H
Hongze Cheng 已提交
632
  if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
633
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
634
    goto _err;
H
Hongze Cheng 已提交
635 636
  }

H
Hongze Cheng 已提交
637
  // clear
H
Hongze Cheng 已提交
638
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
639

H
Hongze Cheng 已提交
640 641 642
  return code;

_err:
H
Hongze Cheng 已提交
643
  tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
644 645 646
  return code;
}

H
Hongze Cheng 已提交
647 648 649
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
650
  // write aBlockIdx
H
Hongze Cheng 已提交
651
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
652 653
  if (code) goto _err;

H
Hongze Cheng 已提交
654 655
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
656 657
  if (code) goto _err;

H
Hongze Cheng 已提交
658
  // update file header
H
Hongze Cheng 已提交
659
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
660 661 662
  if (code) goto _err;

  // upsert SDFileSet
H
Hongze Cheng 已提交
663
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
664 665 666
  if (code) goto _err;

  // close and sync
H
Hongze Cheng 已提交
667
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
668 669
  if (code) goto _err;

H
Hongze Cheng 已提交
670 671
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
672
    if (code) goto _err;
H
Hongze Cheng 已提交
673 674 675 676 677 678
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
679
  tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
680 681 682
  return code;
}

H
Hongze Cheng 已提交
683 684 685
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;

H
Hongze Cheng 已提交
686
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
687
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
688
    code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
689 690 691 692 693 694 695
    if (code) goto _err;

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }

H
Hongze Cheng 已提交
696 697
    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
698 699 700 701 702 703 704 705 706
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
707
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
708
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
709 710 711
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
712 713 714 715 716

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
717 718 719
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
  if (code) goto _err;
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721 722
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
723
  if (code) goto _err;
H
Hongze Cheng 已提交
724 725 726 727

  return code;

_err:
S
Shengliang Guan 已提交
728
  tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
729 730
  tsdbDataFReaderClose(&pCommitter->dReader.pReader);
  tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
H
Hongze Cheng 已提交
731 732 733
  return code;
}

H
Hongze Cheng 已提交
734 735
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
736
  int32_t code = 0;
H
Hongze Cheng 已提交
737

H
Hongze Cheng 已提交
738 739
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
740

H
more  
Hongze Cheng 已提交
741
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
742 743
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
more  
Hongze Cheng 已提交
744
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
745

H
Hongze Cheng 已提交
746
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
747
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
748 749 750 751
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
752
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
753
  pCommitter->sttTrigger = pTsdb->pVnode->config.sttTrigger;
H
Hongze Cheng 已提交
754 755 756 757 758
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
759
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
760 761 762 763 764
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
765
  tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
766 767 768
  return code;
}

H
Hongze Cheng 已提交
769 770 771
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
772
  // reader
H
Hongze Cheng 已提交
773 774
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
775 776 777 778
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
779
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
780 781
  if (code) goto _exit;

H
Hongze Cheng 已提交
782
  // merger
H
Hongze Cheng 已提交
783
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
784 785 786
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
787 788 789 790 791 792 793 794 795
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    code = tBlockDataCreate(&pIter->bData);
    if (code) goto _exit;
  }

  // writer
H
Hongze Cheng 已提交
796 797 798 799 800 801
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
802 803
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
804 805 806 807
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
808
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
809
  if (code) goto _exit;
H
Hongze Cheng 已提交
810

H
Hongze Cheng 已提交
811
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
H
Hongze Cheng 已提交
812 813
  if (code) goto _exit;

H
Hongze Cheng 已提交
814 815 816 817 818
_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
819
  // reader
H
Hongze Cheng 已提交
820 821
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
822
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
823

H
Hongze Cheng 已提交
824
  // merger
H
Hongze Cheng 已提交
825
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
826 827
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
828 829 830 831
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
832
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
833
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
834
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
835 836
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
H
Hongze Cheng 已提交
837 838
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
839 840
}

H
Hongze Cheng 已提交
841 842 843 844
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
845

H
Hongze Cheng 已提交
846
  // check
H
Hongze Cheng 已提交
847
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
848

H
Hongze Cheng 已提交
849 850
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
851
  if (code) goto _err;
H
Hongze Cheng 已提交
852 853 854

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
855 856
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
857
    if (code) goto _err;
H
Hongze Cheng 已提交
858
  }
H
Hongze Cheng 已提交
859

H
Hongze Cheng 已提交
860 861 862
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
863
_exit:
H
Hongze Cheng 已提交
864
  tsdbInfo("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
H
Hongze Cheng 已提交
865
  return code;
H
Hongze Cheng 已提交
866

H
Hongze Cheng 已提交
867
_err:
H
Hongze Cheng 已提交
868
  tsdbCommitDataEnd(pCommitter);
S
Shengliang Guan 已提交
869
  tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
870 871
  return code;
}
H
Hongze Cheng 已提交
872

H
Hongze Cheng 已提交
873 874 875 876
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
877

H
Hongze Cheng 已提交
878 879
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
880
  }
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882 883 884 885 886
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
887

H
Hongze Cheng 已提交
888
  // impl
H
Hongze Cheng 已提交
889 890 891
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
892
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
893 894 895 896 897
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
898
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
924
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
940
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
941 942 943
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
944
  }
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946 947 948 949 950
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
951

H
Hongze Cheng 已提交
952
_exit:
S
Shengliang Guan 已提交
953
  tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
954 955 956
  return code;

_err:
S
Shengliang Guan 已提交
957
  tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
958
  return code;
H
Hongze Cheng 已提交
959 960
}

H
Hongze Cheng 已提交
961
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
962 963 964 965
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
966
  ASSERT(eno == 0);
H
Hongze Cheng 已提交
967 968 969

  code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
  if (code) goto _err;
H
Hongze Cheng 已提交
970

H
Hongze Cheng 已提交
971
  // lock
H
Hongze Cheng 已提交
972
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
973 974 975 976 977 978 979 980

  // commit or rollback
  code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

H
Hongze Cheng 已提交
981
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
982 983

  // unlock
H
Hongze Cheng 已提交
984 985
  taosThreadRwlockUnlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
986
  tsdbUnrefMemTable(pMemTable, NULL);
H
Hongze Cheng 已提交
987
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
988
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
989

H
Hongze Cheng 已提交
990 991 992 993
  // if (pCommitter->toMerge) {
  //   code = tsdbMerge(pTsdb);
  //   if (code) goto _err;
  // }
H
Hongze Cheng 已提交
994

S
Shengliang Guan 已提交
995
  tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
996 997 998
  return code;

_err:
S
Shengliang Guan 已提交
999
  tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1000 1001
  return code;
}
H
Hongze Cheng 已提交
1002

H
Hongze Cheng 已提交
1003
// ================================================================================
H
Hongze Cheng 已提交
1004

H
Hongze Cheng 已提交
1005 1006
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1007 1008
}

H
Hongze Cheng 已提交
1009
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1010 1011 1012 1013
  int32_t code = 0;

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1014
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1031 1032
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1042
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1043 1044 1045 1046 1047
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1048 1049 1050
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1051

H
Hongze Cheng 已提交
1052
          code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
1091
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    ASSERT(pRowInfo->row.type == 0);
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
    if (code) goto _err;

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
    if (code) goto _err;

    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1115
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1116 1117 1118
      }
    }

H
Hongze Cheng 已提交
1119
    if (pBlockData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1120 1121
      code =
          tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1122 1123 1124 1125
      if (code) goto _err;
    }
  }

H
Hongze Cheng 已提交
1126 1127
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
  if (code) goto _err;
H
Hongze Cheng 已提交
1128 1129 1130 1131 1132

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1133 1134 1135
  return code;
}

H
Hongze Cheng 已提交
1136
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1137 1138 1139 1140 1141 1142
  int32_t     code = 0;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1143
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
  if (code) goto _err;

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
      if (code) goto _err;

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      ASSERT(pRowInfo->row.type == 0);
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
      if (code) goto _err;

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
      if (code) goto _err;

      code = tsdbNextCommitRow(pCommitter);
      if (code) goto _err;

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1180
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1181 1182 1183 1184 1185 1186
        }
      }
    } else {
      ASSERT(0);
    }

H
Hongze Cheng 已提交
1187
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1188
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
      if (code) goto _err;
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
    if (code) goto _err;

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1204
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1205
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1206 1207 1208 1209
      if (code) goto _err;
    }
  }

H
Hongze Cheng 已提交
1210 1211
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
  if (code) goto _err;
H
Hongze Cheng 已提交
1212 1213 1214 1215 1216

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1217 1218 1219
  return code;
}

H
Hongze Cheng 已提交
1220 1221 1222 1223 1224 1225 1226
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
  int32_t    code = 0;
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1227 1228
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1229 1230 1231 1232
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1233 1234 1235
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1236
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1237 1238

      if (c < 0) {
H
Hongze Cheng 已提交
1239
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1240 1241 1242 1243
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1244
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1245
        } else {
H
Hongze Cheng 已提交
1246
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1247 1248
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1249
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1250 1251 1252 1253
        if (code) goto _err;

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1254
      } else {
H
Hongze Cheng 已提交
1255
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1256 1257 1258 1259
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1260
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1261
        } else {
H
Hongze Cheng 已提交
1262
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1263 1264 1265
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1266 1267 1268
      }
    }

H
Hongze Cheng 已提交
1269 1270
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1271 1272 1273 1274
      if (code) goto _err;

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1275
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1276
      } else {
H
Hongze Cheng 已提交
1277
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1278 1279
      }
    }
H
Hongze Cheng 已提交
1280 1281 1282

    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
1293
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1294 1295 1296 1297
  int32_t code = 0;

  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
  if (pBDatal->suid || pBDatal->uid) {
H
Hongze Cheng 已提交
1298
    if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
H
Hongze Cheng 已提交
1299 1300
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
      if (code) goto _exit;
H
Hongze Cheng 已提交
1301 1302 1303 1304 1305
      tBlockDataReset(pBDatal);
    }
  }

  if (!pBDatal->suid && !pBDatal->uid) {
H
Hongze Cheng 已提交
1306 1307
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1308
    code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1309
    if (code) goto _exit;
H
Hongze Cheng 已提交
1310 1311
  }

H
Hongze Cheng 已提交
1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325
_exit:
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;

  SBlockData *pBData = &pCommitter->dWriter.bData;
  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;

  TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
  code = tsdbInitLastBlockIfNeed(pCommitter, id);
  if (code) goto _err;

H
Hongze Cheng 已提交
1326 1327 1328 1329 1330 1331
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
    code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
    if (code) goto _err;

    if (pBDatal->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1332
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
      if (code) goto _err;
    }
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
1343
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1344
  int32_t code = 0;
H
Hongze Cheng 已提交
1345

H
Hongze Cheng 已提交
1346
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1347 1348 1349 1350
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1351
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1352

H
Hongze Cheng 已提交
1353
  SBlockData *pBData;
H
Hongze Cheng 已提交
1354
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1355
    pBData = &pCommitter->dWriter.bDatal;
H
Hongze Cheng 已提交
1356 1357
    code = tsdbInitLastBlockIfNeed(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1358
  } else {
H
Hongze Cheng 已提交
1359
    pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1360
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1361
  }
H
Hongze Cheng 已提交
1362

H
Hongze Cheng 已提交
1363 1364 1365 1366
  while (pRowInfo) {
    STSchema *pTSchema = NULL;
    if (pRowInfo->row.type == 0) {
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1367
      if (code) goto _err;
H
Hongze Cheng 已提交
1368 1369
      pTSchema = pCommitter->skmRow.pTSchema;
    }
H
Hongze Cheng 已提交
1370

H
Hongze Cheng 已提交
1371 1372
    code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
    if (code) goto _err;
H
Hongze Cheng 已提交
1373

H
Hongze Cheng 已提交
1374 1375
    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1376

H
Hongze Cheng 已提交
1377 1378 1379
    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
      pRowInfo = NULL;
H
Hongze Cheng 已提交
1380 1381
    }

H
Hongze Cheng 已提交
1382 1383
    if (pBData->nRow >= pCommitter->maxRow) {
      if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1384
        code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1385 1386
        if (code) goto _err;
      } else {
H
Hongze Cheng 已提交
1387 1388
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1389 1390
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
1391 1392 1393
    }
  }

H
Hongze Cheng 已提交
1394 1395
  if (!pCommitter->toLastOnly && pBData->nRow) {
    if (pBData->nRow > pCommitter->minRow) {
H
Hongze Cheng 已提交
1396
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1397 1398 1399 1400 1401 1402 1403
      if (code) goto _err;
    } else {
      code = tsdbAppendLastBlock(pCommitter);
      if (code) goto _err;
    }
  }

H
Hongze Cheng 已提交
1404
_exit:
H
Hongze Cheng 已提交
1405 1406 1407
  return code;

_err:
H
Hongze Cheng 已提交
1408
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1409 1410 1411
  return code;
}

H
Hongze Cheng 已提交
1412 1413 1414
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
1415
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1416
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1417
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1418 1419 1420
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1421

H
Hongze Cheng 已提交
1422 1423
    code = tsdbMoveCommitData(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1424 1425

    // start
H
Hongze Cheng 已提交
1426
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1427 1428

    // impl
H
Hongze Cheng 已提交
1429
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1430
    if (code) goto _err;
H
Hongze Cheng 已提交
1431
    code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1432
    if (code) goto _err;
H
Hongze Cheng 已提交
1433
    code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1434
    if (code) goto _err;
H
Hongze Cheng 已提交
1435

H
Hongze Cheng 已提交
1436 1437
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1438 1439
    if (code) goto _err;

H
Hongze Cheng 已提交
1440
    /* handle remain table data */
H
Hongze Cheng 已提交
1441
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1442
    if (code) goto _err;
H
Hongze Cheng 已提交
1443

H
Hongze Cheng 已提交
1444
    // end
H
Hongze Cheng 已提交
1445 1446
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1447
      code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
H
Hongze Cheng 已提交
1448 1449 1450 1451 1452 1453 1454
      if (code) goto _err;

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1455 1456
  }

H
Hongze Cheng 已提交
1457 1458 1459 1460 1461
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
  if (code) goto _err;

H
Hongze Cheng 已提交
1462 1463 1464
  code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                           pCommitter->cmprAlg);
  if (code) goto _err;
H
Hongze Cheng 已提交
1465

H
Hongze Cheng 已提交
1466 1467 1468 1469 1470 1471
  return code;

_err:
  tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}