tsdbCommit.c 41.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
Hongze Cheng 已提交
20 21 22
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
23
  EDataIterT  type;
H
Hongze Cheng 已提交
24 25 26 27 28 29
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
30 31 32
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
33 34
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
35
    };  // stt file data iter
H
Hongze Cheng 已提交
36 37 38
  };
} SDataIter;

H
Hongze Cheng 已提交
39
typedef struct {
H
Hongze Cheng 已提交
40
  STsdb *pTsdb;
H
Hongze Cheng 已提交
41
  /* commit data */
H
Hongze Cheng 已提交
42
  int64_t commitID;
H
Hongze Cheng 已提交
43 44
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
45 46
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
47
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
48
  int8_t  maxLast;
H
Hongze Cheng 已提交
49 50
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
51
  // --------------
H
Hongze Cheng 已提交
52
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
53 54 55
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
56
  // commit file data
H
Hongze Cheng 已提交
57 58
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
59 60 61
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
62
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
63
    SBlockData    bData;
H
Hongze Cheng 已提交
64
  } dReader;
H
Hongze Cheng 已提交
65 66 67
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
68
    SDataIter  dataIter;
H
Hongze Cheng 已提交
69
    SDataIter  aDataIter[TSDB_MAX_STT_FILE];
H
Hongze Cheng 已提交
70
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
71
  };
H
Hongze Cheng 已提交
72 73 74
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
75
    SArray       *aSttBlk;    // SArray<SSttBlk>
H
Hongze Cheng 已提交
76
    SMapData      mBlock;     // SMapData<SDataBlk>
H
Hongze Cheng 已提交
77
    SBlockData    bData;
H
Hongze Cheng 已提交
78
    SBlockData    bDatal;
H
Hongze Cheng 已提交
79 80 81
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
82
  /* commit del */
H
Hongze Cheng 已提交
83 84
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
85 86 87
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
88
} SCommitter;
H
refact  
Hongze Cheng 已提交
89

H
Hongze Cheng 已提交
90 91 92 93 94
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
95 96
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
97
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
115

H
refact  
Hongze Cheng 已提交
116
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
117
  int32_t code = 0;
H
Hongze Cheng 已提交
118

119 120
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
121 122
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
123
  if (code) goto _err;
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
  // lock
  code = taosThreadRwlockWrlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

  pTsdb->mem = pMemTable;

  // unlock
  code = taosThreadRwlockUnlock(&pTsdb->rwLock);
  if (code) {
    code = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

H
Hongze Cheng 已提交
141 142 143
  return code;

_err:
S
Shengliang Guan 已提交
144
  tsdbError("vgId:%d, tsdb begin failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
145
  return code;
H
Hongze Cheng 已提交
146 147
}

H
more  
Hongze Cheng 已提交
148
int32_t tsdbCommit(STsdb *pTsdb) {
149
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
150

H
more  
Hongze Cheng 已提交
151
  int32_t    code = 0;
H
Hongze Cheng 已提交
152 153 154 155
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
156
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
157
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
158
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
159 160 161
    taosThreadRwlockUnlock(&pTsdb->rwLock);

    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
162 163
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
164

H
more  
Hongze Cheng 已提交
165
  // start commit
H
more  
Hongze Cheng 已提交
166
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
167
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
168

H
refact  
Hongze Cheng 已提交
169 170
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
171
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
172 173

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
174
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
175 176

  // end commit
H
more  
Hongze Cheng 已提交
177
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
178
  if (code) goto _err;
H
refact  
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180
_exit:
H
refact  
Hongze Cheng 已提交
181 182 183
  return code;

_err:
H
Hongze Cheng 已提交
184
  tsdbEndCommit(&commith, code);
C
Cary Xu 已提交
185
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
186 187 188
  return code;
}

H
Hongze Cheng 已提交
189
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
190 191 192 193
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData));
  if (pCommitter->aDelData == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx));
  if (pCommitter->aDelIdxN == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
211

H
Hongze Cheng 已提交
212
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
213
  if (pDelFileR) {
H
Hongze Cheng 已提交
214
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
215
    if (code) goto _err;
H
Hongze Cheng 已提交
216

H
Hongze Cheng 已提交
217
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
218
    if (code) goto _err;
H
Hongze Cheng 已提交
219 220
  }

H
Hongze Cheng 已提交
221
  // prepare new
H
Hongze Cheng 已提交
222 223
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
224
  if (code) goto _err;
H
Hongze Cheng 已提交
225 226

_exit:
S
Shengliang Guan 已提交
227
  tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
228 229 230
  return code;

_err:
S
Shengliang Guan 已提交
231
  tsdbError("vgId:%d, commit del start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
232 233 234
  return code;
}

H
Hongze Cheng 已提交
235
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
236
  int32_t   code = 0;
H
Hongze Cheng 已提交
237
  SDelData *pDelData;
H
Hongze Cheng 已提交
238 239
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
240 241

  if (pTbData) {
H
Hongze Cheng 已提交
242 243
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245 246 247 248
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
249 250

  if (pDelIdx) {
H
Hongze Cheng 已提交
251 252 253
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
254
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
255
    if (code) goto _err;
256 257
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
258 259
  }

H
Hongze Cheng 已提交
260
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
261

H
Hongze Cheng 已提交
262
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
263 264

  // memory
H
Hongze Cheng 已提交
265 266
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
267 268 269 270
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }
H
Hongze Cheng 已提交
271 272 273
  }

  // write
H
Hongze Cheng 已提交
274
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
275 276 277
  if (code) goto _err;

  // put delIdx
278
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
279 280 281
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
282 283 284 285 286

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
287
  tsdbError("vgId:%d, commit table del failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
288 289 290
  return code;
}

H
Hongze Cheng 已提交
291 292
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
293
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
296
  if (code) goto _err;
H
Hongze Cheng 已提交
297

H
Hongze Cheng 已提交
298 299 300
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
  if (code) goto _err;

H
Hongze Cheng 已提交
301
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
302
  if (code) goto _err;
H
Hongze Cheng 已提交
303

H
Hongze Cheng 已提交
304
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
305
  if (code) goto _err;
H
Hongze Cheng 已提交
306 307

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
308
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
309 310 311
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
312 313 314 315
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
316 317 318
  return code;

_err:
S
Shengliang Guan 已提交
319
  tsdbError("vgId:%d, commit del end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
320 321 322
  return code;
}

H
Hongze Cheng 已提交
323
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
324 325
  int32_t code = 0;

H
Hongze Cheng 已提交
326
  if (suid) {
H
Hongze Cheng 已提交
327 328
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
329 330
      goto _exit;
    }
H
Hongze Cheng 已提交
331
  } else {
H
Hongze Cheng 已提交
332
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
333 334
  }

H
Hongze Cheng 已提交
335 336 337 338
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
  tTSchemaDestroy(pSkmInfo->pTSchema);
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
  if (code) goto _exit;

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
  if (code) {
    goto _exit;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
370 371 372 373 374 375 376 377 378 379
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
380
    code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
381 382 383 384 385 386 387 388 389 390 391
    if (code) goto _exit;

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
392 393 394 395
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
396
  tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
H
Hongze Cheng 已提交
397 398

  // memory
H
Hongze Cheng 已提交
399
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
400 401
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
402 403 404 405 406 407 408
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
409
      pRow = NULL;
H
Hongze Cheng 已提交
410 411
    }

H
Hongze Cheng 已提交
412 413
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
414 415 416 417 418
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
419
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
420 421 422
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
423
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
424
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
425
  if (pReader) {
H
Hongze Cheng 已提交
426
    if (pReader->pSet->nSttF >= pCommitter->maxLast) {
H
Hongze Cheng 已提交
427
      int8_t iIter = 0;
H
Hongze Cheng 已提交
428
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
429
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
430
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
431
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
432

H
Hongze Cheng 已提交
433
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
434 435
        if (code) goto _err;

H
Hongze Cheng 已提交
436
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438 439 440
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
        code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
441 442 443 444 445 446 447 448 449 450
        if (code) goto _err;

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
451
    } else {
H
Hongze Cheng 已提交
452 453 454
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
455 456 457 458
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
459
    }
H
Hongze Cheng 已提交
460 461 462 463 464 465 466 467 468 469 470
  }

  code = tsdbNextCommitRow(pCommitter);
  if (code) goto _err;

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
471 472 473 474
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476
  // memory
H
Hongze Cheng 已提交
477 478 479
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
480
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
481

H
Hongze Cheng 已提交
482
  // Reader
H
Hongze Cheng 已提交
483 484
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
485
  if (pRSet) {
H
Hongze Cheng 已提交
486
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
487 488
    if (code) goto _err;

H
Hongze Cheng 已提交
489
    // data
H
Hongze Cheng 已提交
490
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
491
    if (code) goto _err;
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
494 495
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
496
      code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
497 498 499 500
      if (code) goto _err;
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
501
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
502
  } else {
H
Hongze Cheng 已提交
503
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
504
  }
H
Hongze Cheng 已提交
505

H
Hongze Cheng 已提交
506
  // Writer
H
Hongze Cheng 已提交
507 508 509
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
510
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
511
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
512
  if (pRSet) {
H
Hongze Cheng 已提交
513
    ASSERT(pRSet->nSttF <= pCommitter->maxLast);
H
Hongze Cheng 已提交
514 515
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
516
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
517 518 519
    if (pRSet->nSttF < pCommitter->maxLast) {
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
520
      }
H
Hongze Cheng 已提交
521
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
522
    } else {
H
Hongze Cheng 已提交
523
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
524
    }
H
Hongze Cheng 已提交
525
  } else {
H
Hongze Cheng 已提交
526
    SDiskID did = {0};
527
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
H
Hongze Cheng 已提交
528
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
529
    wSet.diskId = did;
H
Hongze Cheng 已提交
530
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
531
  }
H
Hongze Cheng 已提交
532
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
533
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
534
  if (code) goto _err;
H
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
537
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
538 539 540 541
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
  tBlockDataReset(&pCommitter->dWriter.bDatal);

H
Hongze Cheng 已提交
542 543 544 545
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
546
_exit:
H
Hongze Cheng 已提交
547 548 549
  return code;

_err:
S
Shengliang Guan 已提交
550
  tsdbError("vgId:%d, commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
551
  return code;
H
Hongze Cheng 已提交
552 553
}

H
Hongze Cheng 已提交
554
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
555 556
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
557
  SDataBlk    dataBlk;
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559
  ASSERT(pBlockData->nRow > 0);
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
562

H
Hongze Cheng 已提交
563
  // info
H
Hongze Cheng 已提交
564
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
565 566
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
567

H
Hongze Cheng 已提交
568
    if (iRow == 0) {
H
Hongze Cheng 已提交
569 570
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
571 572 573
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
574
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
575 576 577
      }
    }

H
Hongze Cheng 已提交
578 579
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
580 581
    }

H
Hongze Cheng 已提交
582 583
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
584 585 586
  }

  // write
H
Hongze Cheng 已提交
587 588 589 590
  dataBlk.nSubBlock++;
  code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL,
                            pCommitter->cmprAlg, 0);
H
Hongze Cheng 已提交
591 592
  if (code) goto _err;

H
Hongze Cheng 已提交
593
  // put SDataBlk
H
Hongze Cheng 已提交
594
  code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
595
  if (code) goto _err;
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597
  // clear
H
Hongze Cheng 已提交
598
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600 601 602 603
  return code;

_err:
  tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
604 605 606 607
  return code;
}

static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
608
  int32_t     code = 0;
H
Hongze Cheng 已提交
609
  SSttBlk     sstBlk;
H
Hongze Cheng 已提交
610 611 612 613
  SBlockData *pBlockData = &pCommitter->dWriter.bDatal;

  ASSERT(pBlockData->nRow > 0);

H
Hongze Cheng 已提交
614
  // info
H
Hongze Cheng 已提交
615 616 617 618 619 620
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
621
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
622 623 624 625
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
626
  }
H
Hongze Cheng 已提交
627 628
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
629

H
Hongze Cheng 已提交
630
  // write
H
Hongze Cheng 已提交
631
  code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &sstBlk.bInfo, NULL, pCommitter->cmprAlg, 1);
H
Hongze Cheng 已提交
632
  if (code) goto _err;
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634
  // push SSttBlk
H
Hongze Cheng 已提交
635
  if (taosArrayPush(pCommitter->dWriter.aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
636
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
637
    goto _err;
H
Hongze Cheng 已提交
638 639
  }

H
Hongze Cheng 已提交
640
  // clear
H
Hongze Cheng 已提交
641
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643 644 645 646
  return code;

_err:
  tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
647 648 649
  return code;
}

H
Hongze Cheng 已提交
650 651 652
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
653
  // write aBlockIdx
H
Hongze Cheng 已提交
654
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
655 656
  if (code) goto _err;

H
Hongze Cheng 已提交
657 658
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
659 660
  if (code) goto _err;

H
Hongze Cheng 已提交
661
  // update file header
H
Hongze Cheng 已提交
662
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
663 664 665
  if (code) goto _err;

  // upsert SDFileSet
H
Hongze Cheng 已提交
666
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
667 668 669
  if (code) goto _err;

  // close and sync
H
Hongze Cheng 已提交
670
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
671 672
  if (code) goto _err;

H
Hongze Cheng 已提交
673 674
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
675
    if (code) goto _err;
H
Hongze Cheng 已提交
676 677 678 679 680 681
  }

_exit:
  return code;

_err:
S
Shengliang Guan 已提交
682
  tsdbError("vgId:%d, commit file data end failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
683 684 685
  return code;
}

H
Hongze Cheng 已提交
686 687 688
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;

H
Hongze Cheng 已提交
689
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
690
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
691
    code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
692 693 694 695 696 697 698
    if (code) goto _err;

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _err;
    }

H
Hongze Cheng 已提交
699 700
    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
701 702 703 704 705 706 707 708 709
  }

  return code;

_err:
  tsdbError("vgId:%d tsdb move commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
710
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
711
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
712 713 714
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
715 716 717 718 719

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
  if (code) goto _err;

H
Hongze Cheng 已提交
720 721 722
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
  if (code) goto _err;
H
Hongze Cheng 已提交
723

H
Hongze Cheng 已提交
724 725
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
726
  if (code) goto _err;
H
Hongze Cheng 已提交
727 728 729 730

  return code;

_err:
S
Shengliang Guan 已提交
731
  tsdbError("vgId:%d, commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
732 733
  tsdbDataFReaderClose(&pCommitter->dReader.pReader);
  tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
H
Hongze Cheng 已提交
734 735 736
  return code;
}

H
Hongze Cheng 已提交
737 738
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
739
  int32_t code = 0;
H
Hongze Cheng 已提交
740

H
Hongze Cheng 已提交
741 742
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
743

H
more  
Hongze Cheng 已提交
744
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
745 746
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
more  
Hongze Cheng 已提交
747
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
748

H
Hongze Cheng 已提交
749
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
750
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
751 752 753 754
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
755
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
756
  pCommitter->maxLast = TSDB_DEFAULT_STT_FILE;  // TODO: make it as a config
H
Hongze Cheng 已提交
757 758 759 760 761
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
H
Hongze Cheng 已提交
762
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
763 764 765 766 767
  if (code) goto _err;

  return code;

_err:
S
Shengliang Guan 已提交
768
  tsdbError("vgId:%d, tsdb start commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
769 770 771
  return code;
}

H
Hongze Cheng 已提交
772 773 774
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
775
  // reader
H
Hongze Cheng 已提交
776 777
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
778 779 780 781
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
782
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
783 784
  if (code) goto _exit;

H
Hongze Cheng 已提交
785
  // merger
H
Hongze Cheng 已提交
786 787 788 789
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) {
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
790 791 792 793 794 795 796 797 798
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _exit;
    }

    code = tBlockDataCreate(&pIter->bData);
    if (code) goto _exit;
  }

  // writer
H
Hongze Cheng 已提交
799 800 801 802 803 804
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
805 806
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
807 808 809 810
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
811
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
812
  if (code) goto _exit;
H
Hongze Cheng 已提交
813

H
Hongze Cheng 已提交
814
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
H
Hongze Cheng 已提交
815 816
  if (code) goto _exit;

H
Hongze Cheng 已提交
817 818 819 820 821
_exit:
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
822
  // reader
H
Hongze Cheng 已提交
823 824
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
825
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
826

H
Hongze Cheng 已提交
827
  // merger
H
Hongze Cheng 已提交
828 829 830
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) {
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
831 832 833 834
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
835
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
836
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
837
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
838 839
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
H
Hongze Cheng 已提交
840 841
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
842 843
}

H
Hongze Cheng 已提交
844 845 846 847
static int32_t tsdbCommitData(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
848

H
Hongze Cheng 已提交
849
  // check
H
Hongze Cheng 已提交
850
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
851

H
Hongze Cheng 已提交
852 853
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
854
  if (code) goto _err;
H
Hongze Cheng 已提交
855 856 857

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
858 859
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
860
    if (code) goto _err;
H
Hongze Cheng 已提交
861
  }
H
Hongze Cheng 已提交
862

H
Hongze Cheng 已提交
863 864 865
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
866
_exit:
H
Hongze Cheng 已提交
867
  tsdbInfo("vgId:%d, commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow);
H
Hongze Cheng 已提交
868
  return code;
H
Hongze Cheng 已提交
869

H
Hongze Cheng 已提交
870
_err:
H
Hongze Cheng 已提交
871
  tsdbCommitDataEnd(pCommitter);
S
Shengliang Guan 已提交
872
  tsdbError("vgId:%d, commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
873 874
  return code;
}
H
Hongze Cheng 已提交
875

H
Hongze Cheng 已提交
876 877 878 879
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
880

H
Hongze Cheng 已提交
881 882
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
883
  }
H
Hongze Cheng 已提交
884

H
Hongze Cheng 已提交
885 886 887 888 889
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
890

H
Hongze Cheng 已提交
891
  // impl
H
Hongze Cheng 已提交
892 893 894
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
895
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
896 897 898 899 900
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
901
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
927
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
    if (code) goto _err;

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
    if (code) goto _err;

    iTbData++;
H
Hongze Cheng 已提交
943
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
944 945 946
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
947
  }
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949 950 951 952 953
  // end
  code = tsdbCommitDelEnd(pCommitter);
  if (code) {
    goto _err;
  }
H
Hongze Cheng 已提交
954

H
Hongze Cheng 已提交
955
_exit:
S
Shengliang Guan 已提交
956
  tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
957 958 959
  return code;

_err:
S
Shengliang Guan 已提交
960
  tsdbError("vgId:%d, commit del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
961
  return code;
H
Hongze Cheng 已提交
962 963
}

H
Hongze Cheng 已提交
964
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
965 966 967 968
  int32_t    code = 0;
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
969
  ASSERT(eno == 0);
H
Hongze Cheng 已提交
970 971 972

  code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
  if (code) goto _err;
H
Hongze Cheng 已提交
973

H
Hongze Cheng 已提交
974
  // lock
H
Hongze Cheng 已提交
975
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
976 977 978 979 980 981 982 983

  // commit or rollback
  code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    goto _err;
  }

H
Hongze Cheng 已提交
984
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
985 986

  // unlock
H
Hongze Cheng 已提交
987 988 989
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
990
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
991
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
992

H
Hongze Cheng 已提交
993 994 995 996
  // if (pCommitter->toMerge) {
  //   code = tsdbMerge(pTsdb);
  //   if (code) goto _err;
  // }
H
Hongze Cheng 已提交
997

S
Shengliang Guan 已提交
998
  tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
999 1000 1001
  return code;

_err:
S
Shengliang Guan 已提交
1002
  tsdbError("vgId:%d, tsdb end commit failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1003 1004
  return code;
}
H
Hongze Cheng 已提交
1005

H
Hongze Cheng 已提交
1006
// ================================================================================
H
Hongze Cheng 已提交
1007

H
Hongze Cheng 已提交
1008 1009
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1010 1011
}

H
Hongze Cheng 已提交
1012
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1013 1014 1015 1016
  int32_t code = 0;

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1017
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1034 1035
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1036 1037 1038 1039 1040 1041 1042 1043 1044
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1045
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1046 1047 1048 1049 1050
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1051 1052 1053
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1054

H
Hongze Cheng 已提交
1055
          code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
1094
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
  int32_t     code = 0;
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    ASSERT(pRowInfo->row.type == 0);
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
    if (code) goto _err;

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
    if (code) goto _err;

    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1118
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1119 1120 1121
      }
    }

H
Hongze Cheng 已提交
1122 1123
    if (pBlockData->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1124 1125 1126 1127 1128
      if (code) goto _err;
    }
  }

  if (pBlockData->nRow) {
H
Hongze Cheng 已提交
1129
    code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1130 1131 1132 1133 1134 1135 1136
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1137 1138 1139
  return code;
}

H
Hongze Cheng 已提交
1140
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1141 1142 1143 1144 1145 1146
  int32_t     code = 0;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1147
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
  if (code) goto _err;

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
      if (code) goto _err;

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      ASSERT(pRowInfo->row.type == 0);
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
      if (code) goto _err;

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
      if (code) goto _err;

      code = tsdbNextCommitRow(pCommitter);
      if (code) goto _err;

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1184
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1185 1186 1187 1188 1189 1190
        }
      }
    } else {
      ASSERT(0);
    }

H
Hongze Cheng 已提交
1191 1192
    if (pBDataW->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
      if (code) goto _err;
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
    if (code) goto _err;

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1208 1209
    if (pBDataW->nRow >= pCommitter->maxRow) {
      code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1210 1211 1212 1213 1214
      if (code) goto _err;
    }
  }

  if (pBDataW->nRow) {
H
Hongze Cheng 已提交
1215
    code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1216 1217 1218 1219 1220 1221 1222
    if (code) goto _err;
  }

  return code;

_err:
  tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1223 1224 1225
  return code;
}

H
Hongze Cheng 已提交
1226 1227 1228 1229 1230 1231 1232
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
  int32_t    code = 0;
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1233 1234
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1235 1236 1237 1238
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1239 1240 1241
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1242
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1243 1244

      if (c < 0) {
H
Hongze Cheng 已提交
1245
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1246 1247 1248 1249
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1250
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1251
        } else {
H
Hongze Cheng 已提交
1252
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1253 1254
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1255
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1256 1257 1258 1259
        if (code) goto _err;

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1260
      } else {
H
Hongze Cheng 已提交
1261
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1262 1263 1264 1265
        if (code) goto _err;

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1266
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1267
        } else {
H
Hongze Cheng 已提交
1268
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1269 1270 1271
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1272 1273 1274
      }
    }

H
Hongze Cheng 已提交
1275 1276
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1277 1278 1279 1280
      if (code) goto _err;

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1281
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1282
      } else {
H
Hongze Cheng 已提交
1283
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1284 1285
      }
    }
H
Hongze Cheng 已提交
1286 1287 1288

    code = tsdbCommitterNextTableData(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
  }

_exit:
  return code;

_err:
  tsdbError("vgId:%d tsdb merge table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}

H
Hongze Cheng 已提交
1299
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1300 1301 1302 1303
  int32_t code = 0;

  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
  if (pBDatal->suid || pBDatal->uid) {
H
Hongze Cheng 已提交
1304
    if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
H
Hongze Cheng 已提交
1305 1306
      if (pBDatal->nRow) {
        code = tsdbCommitLastBlock(pCommitter);
H
Hongze Cheng 已提交
1307
        if (code) goto _exit;
H
Hongze Cheng 已提交
1308 1309 1310 1311 1312 1313
      }
      tBlockDataReset(pBDatal);
    }
  }

  if (!pBDatal->suid && !pBDatal->uid) {
H
Hongze Cheng 已提交
1314 1315
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1316
    code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1317
    if (code) goto _exit;
H
Hongze Cheng 已提交
1318 1319
  }

H
Hongze Cheng 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
_exit:
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;

  SBlockData *pBData = &pCommitter->dWriter.bData;
  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;

  TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
  code = tsdbInitLastBlockIfNeed(pCommitter, id);
  if (code) goto _err;

H
Hongze Cheng 已提交
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
    code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
    if (code) goto _err;

    if (pBDatal->nRow >= pCommitter->maxRow) {
      code = tsdbCommitLastBlock(pCommitter);
      if (code) goto _err;
    }
  }

  return code;

_err:
  return code;
}

H
Hongze Cheng 已提交
1351
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1352
  int32_t code = 0;
H
Hongze Cheng 已提交
1353

H
Hongze Cheng 已提交
1354
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1355 1356 1357 1358
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1359
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1360

H
Hongze Cheng 已提交
1361
  SBlockData *pBData;
H
Hongze Cheng 已提交
1362
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1363
    pBData = &pCommitter->dWriter.bDatal;
H
Hongze Cheng 已提交
1364 1365
    code = tsdbInitLastBlockIfNeed(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1366
  } else {
H
Hongze Cheng 已提交
1367
    pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1368
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1369
  }
H
Hongze Cheng 已提交
1370

H
Hongze Cheng 已提交
1371 1372 1373 1374
  while (pRowInfo) {
    STSchema *pTSchema = NULL;
    if (pRowInfo->row.type == 0) {
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1375
      if (code) goto _err;
H
Hongze Cheng 已提交
1376 1377
      pTSchema = pCommitter->skmRow.pTSchema;
    }
H
Hongze Cheng 已提交
1378

H
Hongze Cheng 已提交
1379 1380
    code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
    if (code) goto _err;
H
Hongze Cheng 已提交
1381

H
Hongze Cheng 已提交
1382 1383
    code = tsdbNextCommitRow(pCommitter);
    if (code) goto _err;
H
Hongze Cheng 已提交
1384

H
Hongze Cheng 已提交
1385 1386 1387
    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
      pRowInfo = NULL;
H
Hongze Cheng 已提交
1388 1389
    }

H
Hongze Cheng 已提交
1390 1391 1392
    if (pBData->nRow >= pCommitter->maxRow) {
      if (pCommitter->toLastOnly) {
        code = tsdbCommitLastBlock(pCommitter);
H
Hongze Cheng 已提交
1393 1394
        if (code) goto _err;
      } else {
H
Hongze Cheng 已提交
1395
        code = tsdbCommitDataBlock(pCommitter);
H
Hongze Cheng 已提交
1396 1397
        if (code) goto _err;
      }
H
Hongze Cheng 已提交
1398 1399 1400
    }
  }

H
Hongze Cheng 已提交
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
  if (!pCommitter->toLastOnly && pBData->nRow) {
    if (pBData->nRow > pCommitter->minRow) {
      code = tsdbCommitDataBlock(pCommitter);
      if (code) goto _err;
    } else {
      code = tsdbAppendLastBlock(pCommitter);
      if (code) goto _err;
    }
  }

H
Hongze Cheng 已提交
1411
_exit:
H
Hongze Cheng 已提交
1412 1413 1414
  return code;

_err:
H
Hongze Cheng 已提交
1415
  tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
H
Hongze Cheng 已提交
1416 1417 1418
  return code;
}

H
Hongze Cheng 已提交
1419 1420 1421
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;

H
Hongze Cheng 已提交
1422
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1423
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1424
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1425 1426 1427
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1428

H
Hongze Cheng 已提交
1429 1430
    code = tsdbMoveCommitData(pCommitter, id);
    if (code) goto _err;
H
Hongze Cheng 已提交
1431 1432

    // start
H
Hongze Cheng 已提交
1433
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1434 1435

    // impl
H
Hongze Cheng 已提交
1436
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1437
    if (code) goto _err;
H
Hongze Cheng 已提交
1438
    code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1439
    if (code) goto _err;
H
Hongze Cheng 已提交
1440
    code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1441
    if (code) goto _err;
H
Hongze Cheng 已提交
1442

H
Hongze Cheng 已提交
1443 1444
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1445 1446
    if (code) goto _err;

H
Hongze Cheng 已提交
1447
    /* handle remain table data */
H
Hongze Cheng 已提交
1448
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1449
    if (code) goto _err;
H
Hongze Cheng 已提交
1450

H
Hongze Cheng 已提交
1451
    // end
H
Hongze Cheng 已提交
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
      code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
      if (code) goto _err;

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        goto _err;
      }
    }
H
Hongze Cheng 已提交
1462 1463
  }

H
Hongze Cheng 已提交
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
  if (code) goto _err;

  if (pCommitter->dWriter.bDatal.nRow > 0) {
    code = tsdbCommitLastBlock(pCommitter);
    if (code) goto _err;
  }

H
Hongze Cheng 已提交
1474 1475 1476 1477 1478 1479
  return code;

_err:
  tsdbError("vgId:%d tsdb commit file data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
  return code;
}