tsdbCommit.c 50.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
more  
Hongze Cheng 已提交
20
#define USE_STREAM_COMPRESSION 0
H
compare  
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
25
  EDataIterT  type;
H
Hongze Cheng 已提交
26 27 28 29 30 31
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
32 33 34
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
35 36
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
37
    };  // stt file data iter
H
Hongze Cheng 已提交
38 39 40
  };
} SDataIter;

H
Hongze Cheng 已提交
41
typedef struct {
H
Hongze Cheng 已提交
42
  STsdb *pTsdb;
H
Hongze Cheng 已提交
43
  /* commit data */
H
Hongze Cheng 已提交
44
  int64_t commitID;
H
Hongze Cheng 已提交
45 46
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
47 48
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
49
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
50
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
51 52
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
53
  // --------------
H
Hongze Cheng 已提交
54
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
55 56 57
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
58
  // commit file data
H
Hongze Cheng 已提交
59 60
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
61 62 63
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
64
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
65
    SBlockData    bData;
H
Hongze Cheng 已提交
66
  } dReader;
H
Hongze Cheng 已提交
67 68 69
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
70
    SDataIter  dataIter;
H
Hongze Cheng 已提交
71
    SDataIter  aDataIter[TSDB_MAX_STT_TRIGGER];
H
Hongze Cheng 已提交
72
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
73
  };
H
Hongze Cheng 已提交
74
  struct {
H
compare  
Hongze Cheng 已提交
75 76 77 78 79 80
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    SArray       *aSttBlk;    // SArray<SSttBlk>
    SMapData      mBlock;     // SMapData<SDataBlk>
    SBlockData    bData;
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
81
    SDiskDataBuilder *pBuilder;
H
compare  
Hongze Cheng 已提交
82 83 84
#else
    SBlockData bDatal;
#endif
H
Hongze Cheng 已提交
85 86 87
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
88
  /* commit del */
H
Hongze Cheng 已提交
89 90
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
91 92 93
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
94
} SCommitter;
H
refact  
Hongze Cheng 已提交
95

H
Hongze Cheng 已提交
96
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo);
H
Hongze Cheng 已提交
97 98 99 100
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
101 102
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
103
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
121

H
refact  
Hongze Cheng 已提交
122
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
123
  int32_t code = 0;
H
Hongze Cheng 已提交
124
  int32_t lino = 0;
H
Hongze Cheng 已提交
125

126 127
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
128 129
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
130
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
131

H
Hongze Cheng 已提交
132
  // lock
H
Hongze Cheng 已提交
133
  if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
134
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
135
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
136 137 138 139 140
  }

  pTsdb->mem = pMemTable;

  // unlock
H
Hongze Cheng 已提交
141
  if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
142
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
143
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
144 145
  }

H
Hongze Cheng 已提交
146 147
_exit:
  if (code) {
S
Shengliang Guan 已提交
148
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
149
  }
H
Hongze Cheng 已提交
150
  return code;
H
Hongze Cheng 已提交
151 152
}

H
Hongze Cheng 已提交
153 154
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
155
  tAssert(pTsdb->imem == NULL);
H
Hongze Cheng 已提交
156 157 158 159 160 161 162
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  return 0;
}

H
Hongze Cheng 已提交
163
int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
164
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
165

H
more  
Hongze Cheng 已提交
166
  int32_t    code = 0;
H
Hongze Cheng 已提交
167
  int32_t    lino = 0;
H
Hongze Cheng 已提交
168
  SCommitter commith;
H
Hongze Cheng 已提交
169
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
170 171

  // check
H
Hongze Cheng 已提交
172
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
173 174 175 176
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
177
    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
178 179
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
180

H
more  
Hongze Cheng 已提交
181
  // start commit
H
Hongze Cheng 已提交
182
  code = tsdbStartCommit(pTsdb, &commith, pInfo);
H
Hongze Cheng 已提交
183
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
184

H
refact  
Hongze Cheng 已提交
185 186
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
187
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
188 189

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
190
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
191 192

  // end commit
H
more  
Hongze Cheng 已提交
193
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
194
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196
_exit:
H
Hongze Cheng 已提交
197 198
  if (code) {
    tsdbEndCommit(&commith, code);
S
Shengliang Guan 已提交
199
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
200
  }
H
refact  
Hongze Cheng 已提交
201 202 203
  return code;
}

H
Hongze Cheng 已提交
204
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
205
  int32_t    code = 0;
H
Hongze Cheng 已提交
206
  int32_t    lino = 0;
H
Hongze Cheng 已提交
207 208 209
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
210
  if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
211
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
212
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
213 214
  }

H
Hongze Cheng 已提交
215
  if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
H
Hongze Cheng 已提交
216
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
217
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
218 219
  }

H
Hongze Cheng 已提交
220
  if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
221
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
222
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
223
  }
H
Hongze Cheng 已提交
224

H
Hongze Cheng 已提交
225
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
226
  if (pDelFileR) {
H
Hongze Cheng 已提交
227
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
228
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
229

H
Hongze Cheng 已提交
230
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
231
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
232 233
  }

H
Hongze Cheng 已提交
234
  // prepare new
H
Hongze Cheng 已提交
235 236
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
237
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
238 239

_exit:
H
Hongze Cheng 已提交
240
  if (code) {
H
Hongze Cheng 已提交
241
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
242 243 244
  } else {
    tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
  }
H
Hongze Cheng 已提交
245 246 247
  return code;
}

H
Hongze Cheng 已提交
248
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
249
  int32_t   code = 0;
H
Hongze Cheng 已提交
250
  int32_t   lino = 0;
H
Hongze Cheng 已提交
251
  SDelData *pDelData;
H
Hongze Cheng 已提交
252 253
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
254 255

  if (pTbData) {
H
Hongze Cheng 已提交
256 257
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
258

H
Hongze Cheng 已提交
259 260 261 262
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
263 264

  if (pDelIdx) {
H
Hongze Cheng 已提交
265 266 267
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
268
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
269
    TSDB_CHECK_CODE(code, lino, _exit);
270 271
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
272 273
  }

H
Hongze Cheng 已提交
274
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
275

H
Hongze Cheng 已提交
276
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
277 278

  // memory
H
Hongze Cheng 已提交
279 280
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
281 282
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
283
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
284
    }
H
Hongze Cheng 已提交
285 286 287
  }

  // write
H
Hongze Cheng 已提交
288
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
289
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
290 291

  // put delIdx
292
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
293
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
294
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
295
  }
H
Hongze Cheng 已提交
296 297

_exit:
H
Hongze Cheng 已提交
298
  if (code) {
H
Hongze Cheng 已提交
299
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
300
              tstrerror(code));
H
Hongze Cheng 已提交
301
  }
H
Hongze Cheng 已提交
302 303 304
  return code;
}

H
Hongze Cheng 已提交
305 306
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
307
  int32_t lino = 0;
H
Hongze Cheng 已提交
308
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
311
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
312

H
Hongze Cheng 已提交
313
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
H
Hongze Cheng 已提交
314
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
315

H
Hongze Cheng 已提交
316
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
317
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
318

H
Hongze Cheng 已提交
319
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
320
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
321 322

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
323
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
324
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
325 326
  }

H
Hongze Cheng 已提交
327 328 329 330
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
331 332
_exit:
  if (code) {
H
Hongze Cheng 已提交
333
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
334
              tstrerror(code));
H
Hongze Cheng 已提交
335
  }
H
Hongze Cheng 已提交
336 337 338
  return code;
}

H
Hongze Cheng 已提交
339
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
340
  int32_t code = 0;
H
Hongze Cheng 已提交
341
  int32_t lino = 0;
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343
  if (suid) {
H
Hongze Cheng 已提交
344 345
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
346 347
      goto _exit;
    }
H
Hongze Cheng 已提交
348
  } else {
H
Hongze Cheng 已提交
349
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
350 351
  }

H
Hongze Cheng 已提交
352 353
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
354
  tDestroyTSchema(pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
355
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
356
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
357 358 359 360 361 362 363

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;
H
Hongze Cheng 已提交
364
  int32_t lino = 0;
H
Hongze Cheng 已提交
365 366 367 368 369 370 371 372 373 374 375 376 377

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
378
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
379
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
380
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
381 382 383 384 385

_exit:
  return code;
}

H
Hongze Cheng 已提交
386 387
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
388
  int32_t lino = 0;
H
Hongze Cheng 已提交
389

390
  tAssert(pCommitter->dReader.pBlockIdx);
H
Hongze Cheng 已提交
391 392 393 394 395 396

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
397
    code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
398
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
399

400
    tAssert(pCommitter->dReader.mBlock.nItem > 0);
H
Hongze Cheng 已提交
401 402 403 404 405 406 407 408
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
409 410 411 412 413 414 415
static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n));
  SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n));

  return tRowInfoCmprFn(&pIter1->r, &pIter2->r);
}

H
Hongze Cheng 已提交
416 417
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
418
  int32_t lino = 0;
H
Hongze Cheng 已提交
419 420

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
421
  tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn);
H
Hongze Cheng 已提交
422 423

  // memory
H
Hongze Cheng 已提交
424
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
425 426
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
427 428 429 430 431 432 433
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
434
      pRow = NULL;
H
Hongze Cheng 已提交
435 436
    }

H
Hongze Cheng 已提交
437 438
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
439 440 441 442 443
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
444
  tAssert(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
445 446 447
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
448
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
449
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
450
  if (pReader) {
H
Hongze Cheng 已提交
451
    if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
452
      int8_t iIter = 0;
H
Hongze Cheng 已提交
453
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
454
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
455
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
456
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
459
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463 464
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
H
Hongze Cheng 已提交
465
        code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
466
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
467 468 469 470 471 472 473 474 475

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
476
    } else {
H
Hongze Cheng 已提交
477 478 479
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
480 481 482 483
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
484
    }
H
Hongze Cheng 已提交
485 486 487
  }

  code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
488
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
489

H
Hongze Cheng 已提交
490 491
_exit:
  if (code) {
S
Shengliang Guan 已提交
492
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
493
              tstrerror(code));
H
Hongze Cheng 已提交
494
  }
H
Hongze Cheng 已提交
495 496 497
  return code;
}

H
Hongze Cheng 已提交
498 499
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
500
  int32_t    lino = 0;
H
Hongze Cheng 已提交
501 502
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
503

H
Hongze Cheng 已提交
504
  // memory
H
Hongze Cheng 已提交
505 506 507
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
508
#if 0
509
  tAssert(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
510
#endif
511

H
Hongze Cheng 已提交
512
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
  // Reader
H
Hongze Cheng 已提交
515 516
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
517
  if (pRSet) {
H
Hongze Cheng 已提交
518
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
519
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
520

H
Hongze Cheng 已提交
521
    // data
H
Hongze Cheng 已提交
522
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
523
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
524

H
Hongze Cheng 已提交
525
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
526 527
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
528
      code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
529
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
530 531 532
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
533
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
534
  } else {
H
Hongze Cheng 已提交
535
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
536
  }
H
Hongze Cheng 已提交
537

H
Hongze Cheng 已提交
538
  // Writer
H
Hongze Cheng 已提交
539 540 541
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
542
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
543
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
544
  if (pRSet) {
545
    tAssert(pRSet->nSttF <= pCommitter->sttTrigger);
H
Hongze Cheng 已提交
546 547
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
548
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
549
    if (pRSet->nSttF < pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
550 551
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
552
      }
H
Hongze Cheng 已提交
553
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
554
    } else {
H
Hongze Cheng 已提交
555
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
556
    }
H
Hongze Cheng 已提交
557
  } else {
H
Hongze Cheng 已提交
558
    SDiskID did = {0};
559
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
H
Hongze Cheng 已提交
560
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
561
    wSet.diskId = did;
H
Hongze Cheng 已提交
562
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
563
  }
H
Hongze Cheng 已提交
564
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
565
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
566
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
567

H
Hongze Cheng 已提交
568
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
569
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
570 571
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
H
compare  
Hongze Cheng 已提交
572
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
573
  tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
574 575 576
#else
  tBlockDataReset(&pCommitter->dWriter.bDatal);
#endif
H
Hongze Cheng 已提交
577

H
Hongze Cheng 已提交
578 579
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
H
Hongze Cheng 已提交
580
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
581

H
Hongze Cheng 已提交
582
_exit:
H
Hongze Cheng 已提交
583
  if (code) {
S
Shengliang Guan 已提交
584
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
585
  }
H
Hongze Cheng 已提交
586
  return code;
H
Hongze Cheng 已提交
587 588
}

H
Hongze Cheng 已提交
589 590
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
591
  int32_t lino = 0;
H
Hongze Cheng 已提交
592

H
Hongze Cheng 已提交
593
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595
  SDataBlk dataBlk;
H
Hongze Cheng 已提交
596
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598
  // info
H
Hongze Cheng 已提交
599
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
600 601
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603
    if (iRow == 0) {
H
Hongze Cheng 已提交
604 605
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
606 607 608
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
609
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
610 611 612
      }
    }

H
Hongze Cheng 已提交
613 614
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
615 616
    }

H
Hongze Cheng 已提交
617 618
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
619 620 621
  }

  // write
H
Hongze Cheng 已提交
622
  dataBlk.nSubBlock++;
H
Hongze Cheng 已提交
623 624
  code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
H
Hongze Cheng 已提交
625
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
626

H
Hongze Cheng 已提交
627
  // put SDataBlk
H
Hongze Cheng 已提交
628
  code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
629
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631
  // clear
H
Hongze Cheng 已提交
632
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634 635
_exit:
  if (code) {
636 637
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
638
  }
H
Hongze Cheng 已提交
639 640 641
  return code;
}

H
Hongze Cheng 已提交
642 643
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
644
  int32_t lino = 0;
H
Hongze Cheng 已提交
645
  SSttBlk sstBlk;
H
Hongze Cheng 已提交
646

H
Hongze Cheng 已提交
647
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
648

H
Hongze Cheng 已提交
649
  // info
H
Hongze Cheng 已提交
650 651 652 653 654 655
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
656
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
657 658 659 660
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
661
  }
H
Hongze Cheng 已提交
662 663
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
664

H
Hongze Cheng 已提交
665
  // write
H
Hongze Cheng 已提交
666
  code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
H
Hongze Cheng 已提交
667
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
668

H
Hongze Cheng 已提交
669
  // push SSttBlk
H
Hongze Cheng 已提交
670
  if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
671
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
672
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
673 674
  }

H
Hongze Cheng 已提交
675
  // clear
H
Hongze Cheng 已提交
676
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
677

H
Hongze Cheng 已提交
678 679
_exit:
  if (code) {
680 681
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
682
  }
H
Hongze Cheng 已提交
683 684 685
  return code;
}

H
Hongze Cheng 已提交
686 687 688 689 690 691 692
static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pBuilder->nRow == 0) return code;

  // gnrt
H
Hongze Cheng 已提交
693 694 695
  const SDiskData *pDiskData;
  const SBlkInfo  *pBlkInfo;
  code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo);
H
Hongze Cheng 已提交
696 697
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
698 699 700 701 702 703
  SSttBlk sttBlk = {.suid = pBuilder->suid,
                    .minUid = pBlkInfo->minUid,
                    .maxUid = pBlkInfo->maxUid,
                    .minKey = pBlkInfo->minKey,
                    .maxKey = pBlkInfo->maxKey,
                    .minVer = pBlkInfo->minVer,
H
Hongze Cheng 已提交
704 705
                    .maxVer = pBlkInfo->maxVer,
                    .nRow = pBuilder->nRow};
H
Hongze Cheng 已提交
706
  // write
H
Hongze Cheng 已提交
707
  code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL);
H
Hongze Cheng 已提交
708 709 710 711 712 713 714 715
  TSDB_CHECK_CODE(code, lino, _exit);

  // push
  if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
716 717
  // clear
  tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
718 719 720

_exit:
  if (code) {
721 722
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
723 724 725 726
  }
  return code;
}

H
Hongze Cheng 已提交
727 728
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
729
  int32_t lino = 0;
H
Hongze Cheng 已提交
730

H
Hongze Cheng 已提交
731
  // write aBlockIdx
H
Hongze Cheng 已提交
732
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
733
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
734

H
Hongze Cheng 已提交
735 736
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
737
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
738

H
Hongze Cheng 已提交
739
  // update file header
H
Hongze Cheng 已提交
740
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
741
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
742 743

  // upsert SDFileSet
H
Hongze Cheng 已提交
744
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
745
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
746 747

  // close and sync
H
Hongze Cheng 已提交
748
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
749
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
750

H
Hongze Cheng 已提交
751 752
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
753
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
754 755 756
  }

_exit:
H
Hongze Cheng 已提交
757
  if (code) {
S
Shengliang Guan 已提交
758
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
759
              tstrerror(code));
H
Hongze Cheng 已提交
760
  }
H
Hongze Cheng 已提交
761 762 763
  return code;
}

H
Hongze Cheng 已提交
764 765
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;
H
Hongze Cheng 已提交
766
  int32_t lino = 0;
H
Hongze Cheng 已提交
767

H
Hongze Cheng 已提交
768
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
769
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
770
    code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
771
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
772 773 774

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
775
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
776 777
    }

H
Hongze Cheng 已提交
778
    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
779
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
780 781
  }

H
Hongze Cheng 已提交
782 783
_exit:
  if (code) {
S
Shengliang Guan 已提交
784
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
785
              tstrerror(code));
H
Hongze Cheng 已提交
786
  }
H
Hongze Cheng 已提交
787 788 789
  return code;
}

H
Hongze Cheng 已提交
790
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
791
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
792
  int32_t    code = 0;
H
Hongze Cheng 已提交
793
  int32_t    lino = 0;
H
Hongze Cheng 已提交
794 795
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
796 797 798

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
799
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
800

H
Hongze Cheng 已提交
801 802
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
803
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
804

H
Hongze Cheng 已提交
805 806
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
807
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
808

H
Hongze Cheng 已提交
809 810
_exit:
  if (code) {
S
Shengliang Guan 已提交
811
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
812 813 814
    tsdbDataFReaderClose(&pCommitter->dReader.pReader);
    tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
  }
H
Hongze Cheng 已提交
815 816 817
  return code;
}

H
Hongze Cheng 已提交
818
// ----------------------------------------------------------------------------
H
Hongze Cheng 已提交
819
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) {
H
Hongze Cheng 已提交
820
  int32_t code = 0;
H
Hongze Cheng 已提交
821
  int32_t lino = 0;
H
Hongze Cheng 已提交
822

H
Hongze Cheng 已提交
823
  memset(pCommitter, 0, sizeof(*pCommitter));
824
  tAssert(pTsdb->imem && "last tsdb commit incomplete");
H
Hongze Cheng 已提交
825

H
Hongze Cheng 已提交
826
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
827
  pCommitter->commitID = pInfo->info.state.commitID;
H
Hongze Cheng 已提交
828 829
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
H
Hongze Cheng 已提交
830 831 832 833
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
  pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
H
Hongze Cheng 已提交
834 835 836
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
837
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
838
  }
H
Hongze Cheng 已提交
839
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
840
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
841

H
Hongze Cheng 已提交
842 843
_exit:
  if (code) {
S
Shengliang Guan 已提交
844
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
845
  }
H
Hongze Cheng 已提交
846 847 848
  return code;
}

H
Hongze Cheng 已提交
849 850
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
851
  int32_t lino = 0;
H
Hongze Cheng 已提交
852

H
Hongze Cheng 已提交
853
  // reader
H
Hongze Cheng 已提交
854 855
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
856
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
857
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
858 859
  }

H
Hongze Cheng 已提交
860
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
861
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
862

H
Hongze Cheng 已提交
863
  // merger
H
Hongze Cheng 已提交
864
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
865 866 867
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
868
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
869
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
870 871 872
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
873
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
874 875 876
  }

  // writer
H
Hongze Cheng 已提交
877 878 879
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
880
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
881 882
  }

H
Hongze Cheng 已提交
883 884
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
885
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
886
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
887 888
  }

H
Hongze Cheng 已提交
889
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
890
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
891

H
compare  
Hongze Cheng 已提交
892
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
893
  code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
894 895 896
#else
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
#endif
H
Hongze Cheng 已提交
897
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
898

H
Hongze Cheng 已提交
899
_exit:
H
Hongze Cheng 已提交
900
  if (code) {
S
Shengliang Guan 已提交
901
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
902
              tstrerror(code));
H
Hongze Cheng 已提交
903
  }
H
Hongze Cheng 已提交
904 905 906 907
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
908
  // reader
H
Hongze Cheng 已提交
909 910
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
911
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
912

H
Hongze Cheng 已提交
913
  // merger
H
Hongze Cheng 已提交
914
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
915 916
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
917 918 919 920
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
921
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
922
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
923
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
924
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
H
compare  
Hongze Cheng 已提交
925
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
926
  tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
927 928 929
#else
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif
930 931
  tDestroyTSchema(pCommitter->skmTable.pTSchema);
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
932 933
}

H
Hongze Cheng 已提交
934
static int32_t tsdbCommitData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
935 936 937
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
938 939
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
940

H
Hongze Cheng 已提交
941
  // check
H
Hongze Cheng 已提交
942
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
943

H
Hongze Cheng 已提交
944 945
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
946
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
947 948 949

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
950 951
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
952
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
953
  }
H
Hongze Cheng 已提交
954

H
Hongze Cheng 已提交
955 956 957
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
958
_exit:
H
Hongze Cheng 已提交
959
  if (code) {
S
Shengliang Guan 已提交
960
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
961
  }
H
Hongze Cheng 已提交
962 963
  return code;
}
H
Hongze Cheng 已提交
964

H
Hongze Cheng 已提交
965
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
966 967 968
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
969 970
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
971

H
Hongze Cheng 已提交
972 973
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
974
  }
H
Hongze Cheng 已提交
975

H
Hongze Cheng 已提交
976 977 978
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
H
Hongze Cheng 已提交
979
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
980
  }
H
Hongze Cheng 已提交
981

H
Hongze Cheng 已提交
982
  // impl
H
Hongze Cheng 已提交
983 984 985
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
986
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
987 988 989
  STbData *pTbData;
  SDelIdx *pDelIdx;

990
  tAssert(nTbData > 0);
H
Hongze Cheng 已提交
991

H
Hongze Cheng 已提交
992
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
H
Hongze Cheng 已提交
1015
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1016 1017

    iTbData++;
H
Hongze Cheng 已提交
1018
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1019 1020 1021 1022
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
H
Hongze Cheng 已提交
1023
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1024 1025 1026 1027 1028 1029 1030

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
1031
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1032 1033

    iTbData++;
H
Hongze Cheng 已提交
1034
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1035 1036 1037
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1038
  }
H
Hongze Cheng 已提交
1039

H
Hongze Cheng 已提交
1040 1041
  // end
  code = tsdbCommitDelEnd(pCommitter);
H
Hongze Cheng 已提交
1042
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1043

H
Hongze Cheng 已提交
1044
_exit:
H
Hongze Cheng 已提交
1045
  if (code) {
S
Shengliang Guan 已提交
1046
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1047
  } else {
S
Shengliang Guan 已提交
1048
    tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1049
  }
H
Hongze Cheng 已提交
1050
  return code;
H
Hongze Cheng 已提交
1051 1052
}

H
Hongze Cheng 已提交
1053
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1054 1055
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1056
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
1057

H
Hongze Cheng 已提交
1058 1059 1060 1061 1062
  if (eno) {
    code = eno;
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
1063
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1064 1065
  }

H
Hongze Cheng 已提交
1066
_exit:
H
Hongze Cheng 已提交
1067
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
1068
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
1069
  pCommitter->aTbDataP = NULL;
H
Hongze Cheng 已提交
1070
  if (code || eno) {
S
Shengliang Guan 已提交
1071
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1072
  } else {
S
Shengliang Guan 已提交
1073
    tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1074
  }
H
Hongze Cheng 已提交
1075 1076
  return code;
}
H
Hongze Cheng 已提交
1077

H
Hongze Cheng 已提交
1078
// ================================================================================
H
Hongze Cheng 已提交
1079

H
Hongze Cheng 已提交
1080 1081
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1082 1083
}

H
Hongze Cheng 已提交
1084
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1085
  int32_t code = 0;
H
Hongze Cheng 已提交
1086
  int32_t lino = 0;
H
Hongze Cheng 已提交
1087 1088 1089

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1090
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1107 1108
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1109 1110 1111 1112 1113 1114 1115 1116 1117
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1118
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1119 1120 1121 1122 1123
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1124 1125 1126
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1127

H
Hongze Cheng 已提交
1128
          code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
1140
      tAssert(0);
H
Hongze Cheng 已提交
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
1151
        tAssert(c);
H
Hongze Cheng 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
H
Hongze Cheng 已提交
1164
  if (code) {
S
Shengliang Guan 已提交
1165
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1166
              tstrerror(code));
H
Hongze Cheng 已提交
1167
  }
H
Hongze Cheng 已提交
1168 1169 1170
  return code;
}

H
Hongze Cheng 已提交
1171
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1172 1173 1174
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1175 1176 1177 1178 1179 1180
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
1181
    tAssert(pRowInfo->row.type == 0);
H
Hongze Cheng 已提交
1182
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1183
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1184 1185

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1186
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1187 1188

    code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1189
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1190 1191 1192 1193 1194 1195 1196

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1197
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1198 1199 1200
      }
    }

H
Hongze Cheng 已提交
1201
    if (pBlockData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1202 1203
      code =
          tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1204
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1205 1206 1207
    }
  }

H
Hongze Cheng 已提交
1208
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1209
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1210

H
Hongze Cheng 已提交
1211 1212
_exit:
  if (code) {
S
Shengliang Guan 已提交
1213
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1214
              tstrerror(code));
H
Hongze Cheng 已提交
1215
  }
H
Hongze Cheng 已提交
1216 1217 1218
  return code;
}

H
Hongze Cheng 已提交
1219
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1220 1221 1222
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1223 1224 1225 1226 1227
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1228
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1229
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1240
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1241 1242 1243 1244 1245 1246 1247 1248

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
1249
      tAssert(pRowInfo->row.type == 0);
H
Hongze Cheng 已提交
1250
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1251
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1252 1253

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1254
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1255 1256

      code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1257
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1258 1259 1260 1261 1262 1263 1264

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1265
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1266 1267 1268
        }
      }
    } else {
1269
      tAssert(0 && "dup rows not allowed");
H
Hongze Cheng 已提交
1270 1271
    }

H
Hongze Cheng 已提交
1272
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1273
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1274
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1275 1276 1277 1278 1279
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1280
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1281 1282 1283 1284 1285 1286 1287 1288

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1289
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1290
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1291
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1292 1293 1294
    }
  }

H
Hongze Cheng 已提交
1295
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1296
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1297

H
Hongze Cheng 已提交
1298 1299
_exit:
  if (code) {
S
Shengliang Guan 已提交
1300
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1301
              tstrerror(code));
H
Hongze Cheng 已提交
1302
  }
H
Hongze Cheng 已提交
1303 1304 1305
  return code;
}

H
Hongze Cheng 已提交
1306
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1307 1308 1309
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1310 1311
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

1312
  tAssert(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
H
Hongze Cheng 已提交
1313 1314
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1315 1316
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1317 1318
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

1319
    tAssert(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);
H
Hongze Cheng 已提交
1320

H
Hongze Cheng 已提交
1321 1322 1323
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1324
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1325 1326

      if (c < 0) {
H
Hongze Cheng 已提交
1327
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1328
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1329 1330 1331

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1332
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1333
        } else {
H
Hongze Cheng 已提交
1334
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1335 1336
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1337
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1338
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1339 1340 1341

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1342
      } else {
H
Hongze Cheng 已提交
1343
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1344
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1345 1346 1347

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1348
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1349
        } else {
H
Hongze Cheng 已提交
1350
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1351 1352 1353
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1354 1355 1356
      }
    }

H
Hongze Cheng 已提交
1357 1358
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1359
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1360 1361 1362

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1363
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1364
      } else {
H
Hongze Cheng 已提交
1365
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1366 1367
      }
    }
H
Hongze Cheng 已提交
1368 1369

    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
1370
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1371 1372 1373
  }

_exit:
H
Hongze Cheng 已提交
1374
  if (code) {
S
Shengliang Guan 已提交
1375
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1376
              tstrerror(code));
H
Hongze Cheng 已提交
1377
  }
H
Hongze Cheng 已提交
1378 1379 1380
  return code;
}

H
Hongze Cheng 已提交
1381
static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1382
  int32_t code = 0;
H
Hongze Cheng 已提交
1383
  int32_t lino = 0;
H
Hongze Cheng 已提交
1384

H
compare  
Hongze Cheng 已提交
1385
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1386 1387 1388
  SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
  if (pBuilder->suid || pBuilder->uid) {
    if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
H
Hongze Cheng 已提交
1389
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1390
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1391 1392

      tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
1393 1394 1395
    }
  }

H
Hongze Cheng 已提交
1396
  if (!pBuilder->suid && !pBuilder->uid) {
1397 1398
    tAssert(pCommitter->skmTable.suid == id.suid);
    tAssert(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1399
    code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
H
Hongze Cheng 已提交
1400
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1401
  }
H
compare  
Hongze Cheng 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413
#else
  SBlockData *pBData = &pCommitter->dWriter.bDatal;
  if (pBData->suid || pBData->uid) {
    if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);

      tBlockDataReset(pBData);
    }
  }

  if (!pBData->suid && !pBData->uid) {
1414 1415
    tAssert(pCommitter->skmTable.suid == id.suid);
    tAssert(pCommitter->skmTable.uid == id.uid);
H
compare  
Hongze Cheng 已提交
1416 1417 1418 1419 1420
    TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
    code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
#endif
H
Hongze Cheng 已提交
1421

H
Hongze Cheng 已提交
1422
_exit:
H
Hongze Cheng 已提交
1423
  if (code) {
S
Shengliang Guan 已提交
1424
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1425
              tstrerror(code));
H
Hongze Cheng 已提交
1426
  }
H
Hongze Cheng 已提交
1427 1428 1429 1430 1431
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1432
  int32_t lino = 0;
H
Hongze Cheng 已提交
1433 1434

  SBlockData *pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1435
  TABLEID     id = {.suid = pBData->suid, .uid = pBData->uid};
H
Hongze Cheng 已提交
1436

H
Hongze Cheng 已提交
1437
  code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
H
Hongze Cheng 已提交
1438
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1439

H
Hongze Cheng 已提交
1440 1441
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
H
Hongze Cheng 已提交
1442

H
compare  
Hongze Cheng 已提交
1443
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1444
    code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
H
Hongze Cheng 已提交
1445
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1446

H
Hongze Cheng 已提交
1447
    if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1448
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1449
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1450 1451 1452

      code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1453
    }
H
compare  
Hongze Cheng 已提交
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463
#else
    code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                               pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
#endif
H
Hongze Cheng 已提交
1464 1465
  }

H
Hongze Cheng 已提交
1466 1467
_exit:
  if (code) {
S
Shengliang Guan 已提交
1468
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1469
              tstrerror(code));
H
Hongze Cheng 已提交
1470
  }
H
Hongze Cheng 已提交
1471 1472 1473
  return code;
}

H
Hongze Cheng 已提交
1474
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1475
  int32_t code = 0;
H
Hongze Cheng 已提交
1476
  int32_t lino = 0;
H
Hongze Cheng 已提交
1477

H
Hongze Cheng 已提交
1478
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1479 1480 1481 1482
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1483
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1484

H
Hongze Cheng 已提交
1485
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1486 1487
    code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1488

H
Hongze Cheng 已提交
1489 1490 1491 1492 1493 1494 1495 1496
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
      if (pRowInfo->row.type == 0) {
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
        TSDB_CHECK_CODE(code, lino, _exit);
        pTSchema = pCommitter->skmRow.pTSchema;
      }

H
compare  
Hongze Cheng 已提交
1497
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1498
      code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
H
compare  
Hongze Cheng 已提交
1499 1500 1501
#else
      code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid);
#endif
H
Hongze Cheng 已提交
1502
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1503

H
Hongze Cheng 已提交
1504 1505
      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1506

H
Hongze Cheng 已提交
1507 1508 1509 1510
      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }
H
Hongze Cheng 已提交
1511

H
compare  
Hongze Cheng 已提交
1512
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1513
      if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1514
        code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1515
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1516 1517 1518

        code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1519
      }
H
compare  
Hongze Cheng 已提交
1520
#else
1521
      if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
H
compare  
Hongze Cheng 已提交
1522 1523 1524 1525 1526
        code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                                 pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
#endif
H
Hongze Cheng 已提交
1527
    }
H
Hongze Cheng 已提交
1528 1529
  } else {
    SBlockData *pBData = &pCommitter->dWriter.bData;
1530
    tAssert(pBData->nRow == 0);
H
Hongze Cheng 已提交
1531

H
Hongze Cheng 已提交
1532 1533 1534 1535
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
      if (pRowInfo->row.type == 0) {
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1536
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
        pTSchema = pCommitter->skmRow.pTSchema;
      }

      code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }

      if (pBData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1552 1553
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1554
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1555
      }
H
Hongze Cheng 已提交
1556 1557
    }

H
Hongze Cheng 已提交
1558 1559 1560 1561 1562 1563 1564 1565 1566
    if (pBData->nRow) {
      if (pBData->nRow > pCommitter->minRow) {
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
        code = tsdbAppendLastBlock(pCommitter);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
1567 1568 1569
    }
  }

H
Hongze Cheng 已提交
1570
_exit:
H
Hongze Cheng 已提交
1571
  if (code) {
S
Shengliang Guan 已提交
1572
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1573
              tstrerror(code));
H
Hongze Cheng 已提交
1574
  }
H
Hongze Cheng 已提交
1575 1576 1577
  return code;
}

H
Hongze Cheng 已提交
1578 1579
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1580
  int32_t lino = 0;
H
Hongze Cheng 已提交
1581

H
Hongze Cheng 已提交
1582
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1583
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1584
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
1585
    tAssert(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
H
Hongze Cheng 已提交
1586 1587
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1588

H
Hongze Cheng 已提交
1589
    code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1590
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1591 1592

    // start
H
Hongze Cheng 已提交
1593
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1594 1595

    // impl
H
Hongze Cheng 已提交
1596
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1597
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1598
    code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1599
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1600
    code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1601
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1602

H
Hongze Cheng 已提交
1603 1604
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1605
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1606

H
Hongze Cheng 已提交
1607
    /* handle remain table data */
H
Hongze Cheng 已提交
1608
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1609
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1610

H
Hongze Cheng 已提交
1611
    // end
H
Hongze Cheng 已提交
1612 1613
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1614
      code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
H
Hongze Cheng 已提交
1615
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1616 1617 1618

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1619
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1620 1621
      }
    }
H
Hongze Cheng 已提交
1622 1623
  }

H
Hongze Cheng 已提交
1624 1625 1626
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1627
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1628

H
compare  
Hongze Cheng 已提交
1629
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1630
  code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
compare  
Hongze Cheng 已提交
1631 1632 1633 1634
#else
  code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                           pCommitter->cmprAlg);
#endif
H
Hongze Cheng 已提交
1635
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1636

H
Hongze Cheng 已提交
1637 1638
_exit:
  if (code) {
S
Shengliang Guan 已提交
1639
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1640
              tstrerror(code));
H
Hongze Cheng 已提交
1641
  }
H
Hongze Cheng 已提交
1642 1643
  return code;
}
H
Hongze Cheng 已提交
1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662

int32_t tsdbFinishCommit(STsdb *pTsdb) {
  int32_t    code = 0;
  int32_t    lino = 0;
  SMemTable *pMemTable = pTsdb->imem;

  // lock
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pTsdb->imem = NULL;

  // unlock
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1663 1664 1665
  if (pMemTable) {
    tsdbUnrefMemTable(pMemTable);
  }
H
Hongze Cheng 已提交
1666 1667 1668

_exit:
  if (code) {
S
Shengliang Guan 已提交
1669
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1670
  } else {
S
Shengliang Guan 已提交
1671
    tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684
  }
  return code;
}

int32_t tsdbRollbackCommit(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
S
Shengliang Guan 已提交
1685
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1686
  } else {
S
Shengliang Guan 已提交
1687
    tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1688 1689
  }
  return code;
1690
}