tsdbCommit.c 51.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
more  
Hongze Cheng 已提交
20
#define USE_STREAM_COMPRESSION 0
H
compare  
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
25
  EDataIterT  type;
H
Hongze Cheng 已提交
26 27 28 29 30 31
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
32 33 34
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
35 36
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
37
    };  // stt file data iter
H
Hongze Cheng 已提交
38 39 40
  };
} SDataIter;

H
Hongze Cheng 已提交
41
typedef struct {
H
Hongze Cheng 已提交
42
  STsdb *pTsdb;
H
Hongze Cheng 已提交
43
  /* commit data */
H
Hongze Cheng 已提交
44
  int64_t commitID;
H
Hongze Cheng 已提交
45 46
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
47 48
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
49
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
50
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
51 52
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
53
  // --------------
H
Hongze Cheng 已提交
54
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
55
  int32_t commitFid;
H
Hongze Cheng 已提交
56
  int32_t expLevel;
H
Hongze Cheng 已提交
57 58
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
59
  // commit file data
H
Hongze Cheng 已提交
60 61
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
62 63 64
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
65
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
66
    SBlockData    bData;
H
Hongze Cheng 已提交
67
  } dReader;
H
Hongze Cheng 已提交
68 69 70
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
71
    SDataIter  dataIter;
H
Hongze Cheng 已提交
72
    SDataIter  aDataIter[TSDB_MAX_STT_TRIGGER];
H
Hongze Cheng 已提交
73
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
74
  };
H
Hongze Cheng 已提交
75 76 77
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
78
    SArray       *aSttBlk;    // SArray<SSttBlk>
H
Hongze Cheng 已提交
79
    SMapData      mBlock;     // SMapData<SDataBlk>
H
Hongze Cheng 已提交
80
    SBlockData    bData;
H
compare  
Hongze Cheng 已提交
81
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
82
    SDiskDataBuilder *pBuilder;
H
compare  
Hongze Cheng 已提交
83 84 85
#else
    SBlockData bDatal;
#endif
H
Hongze Cheng 已提交
86 87 88
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
89
  /* commit del */
H
Hongze Cheng 已提交
90 91
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
92 93 94
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
95
} SCommitter;
H
refact  
Hongze Cheng 已提交
96

H
Hongze Cheng 已提交
97
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo);
H
Hongze Cheng 已提交
98 99 100 101
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
102 103
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
104
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
122

H
refact  
Hongze Cheng 已提交
123
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
124
  int32_t code = 0;
H
Hongze Cheng 已提交
125
  int32_t lino = 0;
H
Hongze Cheng 已提交
126

127 128
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
129 130
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
131
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
132

H
Hongze Cheng 已提交
133
  // lock
H
Hongze Cheng 已提交
134
  if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
135
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
136
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
137 138 139 140 141
  }

  pTsdb->mem = pMemTable;

  // unlock
H
Hongze Cheng 已提交
142
  if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
143
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
144
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
145 146
  }

H
Hongze Cheng 已提交
147 148
_exit:
  if (code) {
S
Shengliang Guan 已提交
149
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
150
  }
H
Hongze Cheng 已提交
151
  return code;
H
Hongze Cheng 已提交
152 153
}

H
Hongze Cheng 已提交
154 155 156 157 158 159 160 161 162 163
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  return 0;
}

H
Hongze Cheng 已提交
164
int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
165
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
166

H
more  
Hongze Cheng 已提交
167
  int32_t    code = 0;
H
Hongze Cheng 已提交
168
  int32_t    lino = 0;
H
Hongze Cheng 已提交
169
  SCommitter commith;
H
Hongze Cheng 已提交
170
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
171 172

  // check
H
Hongze Cheng 已提交
173
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
174
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
175
    pTsdb->imem = NULL;
H
Hongze Cheng 已提交
176 177
    taosThreadRwlockUnlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
178
    tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
179 180
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
181

H
more  
Hongze Cheng 已提交
182
  // start commit
H
Hongze Cheng 已提交
183
  code = tsdbStartCommit(pTsdb, &commith, pInfo);
H
Hongze Cheng 已提交
184
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
185

H
refact  
Hongze Cheng 已提交
186 187
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
188
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
189 190

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
191
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
192 193

  // end commit
H
more  
Hongze Cheng 已提交
194
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
195
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
196

H
Hongze Cheng 已提交
197
_exit:
H
Hongze Cheng 已提交
198 199
  if (code) {
    tsdbEndCommit(&commith, code);
S
Shengliang Guan 已提交
200
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
201
  }
H
refact  
Hongze Cheng 已提交
202 203 204
  return code;
}

H
Hongze Cheng 已提交
205
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
206
  int32_t    code = 0;
H
Hongze Cheng 已提交
207
  int32_t    lino = 0;
H
Hongze Cheng 已提交
208 209 210
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
211
  if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
212
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
213
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
214 215
  }

H
Hongze Cheng 已提交
216
  if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
H
Hongze Cheng 已提交
217
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
218
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
219 220
  }

H
Hongze Cheng 已提交
221
  if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
222
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
223
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
224
  }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
227
  if (pDelFileR) {
H
Hongze Cheng 已提交
228
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
229
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
232
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
233 234
  }

H
Hongze Cheng 已提交
235
  // prepare new
H
Hongze Cheng 已提交
236 237
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
238
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
239 240

_exit:
H
Hongze Cheng 已提交
241
  if (code) {
H
Hongze Cheng 已提交
242
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
243 244 245
  } else {
    tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
  }
H
Hongze Cheng 已提交
246 247 248
  return code;
}

H
Hongze Cheng 已提交
249
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
250
  int32_t   code = 0;
H
Hongze Cheng 已提交
251
  int32_t   lino = 0;
H
Hongze Cheng 已提交
252
  SDelData *pDelData;
H
Hongze Cheng 已提交
253 254
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
255 256

  if (pTbData) {
H
Hongze Cheng 已提交
257 258
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260 261 262 263
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
264 265

  if (pDelIdx) {
H
Hongze Cheng 已提交
266 267 268
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Haojun Liao 已提交
269
    code = tsdbReadDelDatav1(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX);
H
Hongze Cheng 已提交
270
    TSDB_CHECK_CODE(code, lino, _exit);
271 272
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
273 274
  }

H
Hongze Cheng 已提交
275
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
276

H
Hongze Cheng 已提交
277
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
278 279

  // memory
H
Hongze Cheng 已提交
280 281
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
282 283
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
284
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
285
    }
H
Hongze Cheng 已提交
286 287 288
  }

  // write
H
Hongze Cheng 已提交
289
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
290
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
291 292

  // put delIdx
293
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
294
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
295
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
296
  }
H
Hongze Cheng 已提交
297 298

_exit:
H
Hongze Cheng 已提交
299
  if (code) {
H
Hongze Cheng 已提交
300
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
301
              tstrerror(code));
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304 305
  return code;
}

H
Hongze Cheng 已提交
306 307
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
308
  int32_t lino = 0;
H
Hongze Cheng 已提交
309
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
310

H
Hongze Cheng 已提交
311
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
312
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
H
Hongze Cheng 已提交
315
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
318
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
321
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
322 323

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
324
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
325
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
326 327
  }

H
Hongze Cheng 已提交
328 329 330 331
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
332 333
_exit:
  if (code) {
H
Hongze Cheng 已提交
334
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
335
              tstrerror(code));
H
Hongze Cheng 已提交
336
  }
H
Hongze Cheng 已提交
337 338 339
  return code;
}

H
Hongze Cheng 已提交
340
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
341
  int32_t code = 0;
H
Hongze Cheng 已提交
342
  int32_t lino = 0;
H
Hongze Cheng 已提交
343

H
Hongze Cheng 已提交
344
  if (suid) {
H
Hongze Cheng 已提交
345 346
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
347 348
      goto _exit;
    }
H
Hongze Cheng 已提交
349
  } else {
H
Hongze Cheng 已提交
350
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
351 352
  }

H
Hongze Cheng 已提交
353 354
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
355
  tDestroyTSchema(pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
356
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
357
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
358 359 360 361 362 363 364

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;
H
Hongze Cheng 已提交
365
  int32_t lino = 0;
H
Hongze Cheng 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
H
Hongze Cheng 已提交
379
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
380
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
381
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
382 383 384 385 386

_exit:
  return code;
}

H
Hongze Cheng 已提交
387 388
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
389
  int32_t lino = 0;
H
Hongze Cheng 已提交
390 391 392 393 394 395 396 397

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
398
    code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
399
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
400 401 402 403 404 405 406 407 408 409

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
410 411 412 413 414 415 416
static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n));
  SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n));

  return tRowInfoCmprFn(&pIter1->r, &pIter2->r);
}

H
Hongze Cheng 已提交
417 418
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
419
  int32_t lino = 0;
H
Hongze Cheng 已提交
420 421

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
422
  tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn);
H
Hongze Cheng 已提交
423 424

  // memory
H
Hongze Cheng 已提交
425
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
426 427
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
428 429 430 431 432 433 434
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
435
      pRow = NULL;
H
Hongze Cheng 已提交
436 437
    }

H
Hongze Cheng 已提交
438 439
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
440 441 442 443 444
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
445
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
446 447 448
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
449
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
450
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
451
  if (pReader) {
H
Hongze Cheng 已提交
452
    if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
453
      int8_t iIter = 0;
H
Hongze Cheng 已提交
454
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
455
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
456
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
457
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
460
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
463

H
Hongze Cheng 已提交
464 465
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
H
Hongze Cheng 已提交
466
        code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
467
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
468 469 470 471 472 473 474 475 476

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
477
    } else {
H
Hongze Cheng 已提交
478 479 480
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
481 482 483 484
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
485
    }
H
Hongze Cheng 已提交
486 487 488
  }

  code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
489
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
490

H
Hongze Cheng 已提交
491 492
_exit:
  if (code) {
S
Shengliang Guan 已提交
493
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
494
              tstrerror(code));
H
Hongze Cheng 已提交
495
  }
H
Hongze Cheng 已提交
496 497 498
  return code;
}

H
Hongze Cheng 已提交
499 500
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
501
  int32_t    lino = 0;
H
Hongze Cheng 已提交
502 503
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505
  // memory
H
Hongze Cheng 已提交
506
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
H
Hongze Cheng 已提交
507
  pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
508 509
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
510 511 512
#if 0
  ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif
H
Hongze Cheng 已提交
513

H
Hongze Cheng 已提交
514
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516
  // Reader
H
Hongze Cheng 已提交
517 518
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
519
  if (pRSet) {
H
Hongze Cheng 已提交
520
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
521
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523
    // data
H
Hongze Cheng 已提交
524
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
525
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
528 529
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
530
      code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
531
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
532 533 534
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
535
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
536
  } else {
H
Hongze Cheng 已提交
537
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
538
  }
H
Hongze Cheng 已提交
539

H
Hongze Cheng 已提交
540
  // Writer
H
Hongze Cheng 已提交
541 542 543
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
544
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
545
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
546
  if (pRSet) {
H
Hongze Cheng 已提交
547
    ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
H
Hongze Cheng 已提交
548 549
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
550
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
551
    if (pRSet->nSttF < pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
552 553
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
554
      }
H
Hongze Cheng 已提交
555
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
556
    } else {
H
Hongze Cheng 已提交
557
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
558
    }
H
Hongze Cheng 已提交
559
  } else {
H
Hongze Cheng 已提交
560
    SDiskID did = {0};
H
Hongze Cheng 已提交
561 562 563 564
    if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
      code = terrno;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
565
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
566
    wSet.diskId = did;
H
Hongze Cheng 已提交
567
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
568
  }
H
Hongze Cheng 已提交
569
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
570
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
571
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
574
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
575 576
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
H
compare  
Hongze Cheng 已提交
577
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
578
  tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
579
#else
H
Hongze Cheng 已提交
580
  tBlockDataReset(&pCommitter->dWriter.bDatal);
H
compare  
Hongze Cheng 已提交
581
#endif
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583 584
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
H
Hongze Cheng 已提交
585
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587
_exit:
H
Hongze Cheng 已提交
588
  if (code) {
S
Shengliang Guan 已提交
589
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
590
  }
H
Hongze Cheng 已提交
591
  return code;
H
Hongze Cheng 已提交
592 593
}

H
Hongze Cheng 已提交
594 595
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
596
  int32_t lino = 0;
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600
  SDataBlk dataBlk;
H
Hongze Cheng 已提交
601
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603
  // info
H
Hongze Cheng 已提交
604
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
605 606
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
607

H
Hongze Cheng 已提交
608
    if (iRow == 0) {
H
Hongze Cheng 已提交
609 610
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
611 612 613
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
614
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
615 616 617
      }
    }

H
Hongze Cheng 已提交
618 619
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
620 621
    }

H
Hongze Cheng 已提交
622 623
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
624 625 626
  }

  // write
H
Hongze Cheng 已提交
627
  dataBlk.nSubBlock++;
H
Hongze Cheng 已提交
628 629
  code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
H
Hongze Cheng 已提交
630
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632
  // put SDataBlk
H
Hongze Cheng 已提交
633
  code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
634
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
635

H
Hongze Cheng 已提交
636
  // clear
H
Hongze Cheng 已提交
637
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
638

H
Hongze Cheng 已提交
639 640
_exit:
  if (code) {
641 642
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
643
  }
H
Hongze Cheng 已提交
644 645 646
  return code;
}

H
Hongze Cheng 已提交
647 648
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
649
  int32_t lino = 0;
H
Hongze Cheng 已提交
650
  SSttBlk sstBlk;
H
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
653

H
Hongze Cheng 已提交
654
  // info
H
Hongze Cheng 已提交
655 656 657 658 659 660
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
661
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
662 663 664 665
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
666
  }
H
Hongze Cheng 已提交
667 668
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670
  // write
H
Hongze Cheng 已提交
671
  code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
H
Hongze Cheng 已提交
672
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
673

H
Hongze Cheng 已提交
674
  // push SSttBlk
H
Hongze Cheng 已提交
675
  if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
676
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
677
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
678 679
  }

H
Hongze Cheng 已提交
680
  // clear
H
Hongze Cheng 已提交
681
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
682

H
Hongze Cheng 已提交
683 684
_exit:
  if (code) {
685 686
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
687
  }
H
Hongze Cheng 已提交
688
  return code;
H
Hongze Cheng 已提交
689 690
}

H
Hongze Cheng 已提交
691 692 693 694 695 696 697
static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pBuilder->nRow == 0) return code;

  // gnrt
H
Hongze Cheng 已提交
698 699 700
  const SDiskData *pDiskData;
  const SBlkInfo  *pBlkInfo;
  code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo);
H
Hongze Cheng 已提交
701 702
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
703 704 705 706 707 708
  SSttBlk sttBlk = {.suid = pBuilder->suid,
                    .minUid = pBlkInfo->minUid,
                    .maxUid = pBlkInfo->maxUid,
                    .minKey = pBlkInfo->minKey,
                    .maxKey = pBlkInfo->maxKey,
                    .minVer = pBlkInfo->minVer,
H
Hongze Cheng 已提交
709 710
                    .maxVer = pBlkInfo->maxVer,
                    .nRow = pBuilder->nRow};
H
Hongze Cheng 已提交
711
  // write
H
Hongze Cheng 已提交
712
  code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL);
H
Hongze Cheng 已提交
713 714 715 716 717 718 719
  TSDB_CHECK_CODE(code, lino, _exit);

  // push
  if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
720

H
Hongze Cheng 已提交
721 722
  // clear
  tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
723 724 725

_exit:
  if (code) {
726 727
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
728
  }
H
Hongze Cheng 已提交
729 730 731
  return code;
}

H
Hongze Cheng 已提交
732 733
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
734
  int32_t lino = 0;
H
Hongze Cheng 已提交
735

H
Hongze Cheng 已提交
736
  // write aBlockIdx
H
Hongze Cheng 已提交
737
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
738
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
739

H
Hongze Cheng 已提交
740 741
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
742
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
743

H
Hongze Cheng 已提交
744
  // update file header
H
Hongze Cheng 已提交
745
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
746
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
747 748

  // upsert SDFileSet
H
Hongze Cheng 已提交
749
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
750
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
751 752

  // close and sync
H
Hongze Cheng 已提交
753
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
754
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
755

H
Hongze Cheng 已提交
756 757
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
758
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
759 760 761
  }

_exit:
H
Hongze Cheng 已提交
762
  if (code) {
S
Shengliang Guan 已提交
763
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
764
              tstrerror(code));
H
Hongze Cheng 已提交
765
  }
H
Hongze Cheng 已提交
766 767 768
  return code;
}

H
Hongze Cheng 已提交
769 770
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;
H
Hongze Cheng 已提交
771
  int32_t lino = 0;
H
Hongze Cheng 已提交
772

H
Hongze Cheng 已提交
773
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
774
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
775
    code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
776
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
777 778 779

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
780
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
781 782
    }

H
Hongze Cheng 已提交
783
    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
784
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
785 786
  }

H
Hongze Cheng 已提交
787 788
_exit:
  if (code) {
S
Shengliang Guan 已提交
789
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
790
              tstrerror(code));
H
Hongze Cheng 已提交
791
  }
H
Hongze Cheng 已提交
792 793 794
  return code;
}

H
Hongze Cheng 已提交
795
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
796
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
797
  int32_t    code = 0;
H
Hongze Cheng 已提交
798
  int32_t    lino = 0;
H
Hongze Cheng 已提交
799 800
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
801 802 803

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
804
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
805

H
Hongze Cheng 已提交
806 807
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
808
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
809

H
Hongze Cheng 已提交
810 811
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
812
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
813

H
Hongze Cheng 已提交
814 815
_exit:
  if (code) {
S
Shengliang Guan 已提交
816
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
817 818 819
    tsdbDataFReaderClose(&pCommitter->dReader.pReader);
    tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
  }
H
Hongze Cheng 已提交
820 821 822
  return code;
}

H
Hongze Cheng 已提交
823
// ----------------------------------------------------------------------------
H
Hongze Cheng 已提交
824
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) {
H
Hongze Cheng 已提交
825
  int32_t code = 0;
H
Hongze Cheng 已提交
826
  int32_t lino = 0;
H
Hongze Cheng 已提交
827

H
Hongze Cheng 已提交
828
  memset(pCommitter, 0, sizeof(*pCommitter));
H
Hongze Cheng 已提交
829
  ASSERT(pTsdb->imem && "last tsdb commit incomplete");
H
Hongze Cheng 已提交
830

H
Hongze Cheng 已提交
831
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
832
  pCommitter->commitID = pInfo->info.state.commitID;
H
Hongze Cheng 已提交
833 834
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
H
Hongze Cheng 已提交
835 836 837 838
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
  pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
H
Hongze Cheng 已提交
839 840 841
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
842
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
843
  }
H
Hongze Cheng 已提交
844
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
845
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
846

H
Hongze Cheng 已提交
847 848
_exit:
  if (code) {
S
Shengliang Guan 已提交
849
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
850
  }
H
Hongze Cheng 已提交
851 852 853
  return code;
}

H
Hongze Cheng 已提交
854 855
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
856
  int32_t lino = 0;
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858
  // reader
H
Hongze Cheng 已提交
859 860
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
861
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
862
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
863 864
  }

H
Hongze Cheng 已提交
865
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
866
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
867

H
Hongze Cheng 已提交
868
  // merger
H
Hongze Cheng 已提交
869
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
870 871 872
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
873
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
874
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
875 876 877
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
878
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
879 880 881
  }

  // writer
H
Hongze Cheng 已提交
882 883 884
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
885
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
886 887
  }

H
Hongze Cheng 已提交
888 889
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
890
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
891
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
892 893
  }

H
Hongze Cheng 已提交
894
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
895
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
896

H
compare  
Hongze Cheng 已提交
897
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
898
  code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
899
#else
H
Hongze Cheng 已提交
900
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
H
compare  
Hongze Cheng 已提交
901
#endif
H
Hongze Cheng 已提交
902
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
903

H
Hongze Cheng 已提交
904
_exit:
H
Hongze Cheng 已提交
905
  if (code) {
S
Shengliang Guan 已提交
906
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
907
              tstrerror(code));
H
Hongze Cheng 已提交
908
  }
H
Hongze Cheng 已提交
909 910 911 912
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
913
  // reader
H
Hongze Cheng 已提交
914 915
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
916
  tBlockDataDestroy(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918
  // merger
H
Hongze Cheng 已提交
919
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
920 921
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
922
    tBlockDataDestroy(&pIter->bData);
H
Hongze Cheng 已提交
923 924 925
  }

  // writer
H
Hongze Cheng 已提交
926
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
927
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
928
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
929
  tBlockDataDestroy(&pCommitter->dWriter.bData);
H
compare  
Hongze Cheng 已提交
930
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
931
  tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
932
#else
H
Hongze Cheng 已提交
933
  tBlockDataDestroy(&pCommitter->dWriter.bDatal);
H
compare  
Hongze Cheng 已提交
934
#endif
H
Hongze Cheng 已提交
935 936
  tDestroyTSchema(pCommitter->skmTable.pTSchema);
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
937 938
}

H
Hongze Cheng 已提交
939
static int32_t tsdbCommitData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
940 941 942
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
943 944
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946
  // check
H
Hongze Cheng 已提交
947
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949 950
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
951
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
952 953 954

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
955 956
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
957
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
958
  }
H
Hongze Cheng 已提交
959

H
Hongze Cheng 已提交
960 961 962
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
963
_exit:
H
Hongze Cheng 已提交
964
  if (code) {
S
Shengliang Guan 已提交
965
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
966
  }
H
Hongze Cheng 已提交
967 968
  return code;
}
H
Hongze Cheng 已提交
969

H
Hongze Cheng 已提交
970
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
971 972 973
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
974 975
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
976

H
Hongze Cheng 已提交
977 978
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
979
  }
H
Hongze Cheng 已提交
980

H
Hongze Cheng 已提交
981 982 983
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
H
Hongze Cheng 已提交
984
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
985
  }
H
Hongze Cheng 已提交
986

H
Hongze Cheng 已提交
987
  // impl
H
Hongze Cheng 已提交
988 989 990
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
991
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
992 993 994 995 996
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
997
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
H
Hongze Cheng 已提交
1020
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1021 1022

    iTbData++;
H
Hongze Cheng 已提交
1023
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1024 1025 1026 1027
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
H
Hongze Cheng 已提交
1028
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1029 1030 1031 1032 1033 1034 1035

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
1036
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1037 1038

    iTbData++;
H
Hongze Cheng 已提交
1039
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1040 1041 1042
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1043
  }
H
Hongze Cheng 已提交
1044

H
Hongze Cheng 已提交
1045 1046
  // end
  code = tsdbCommitDelEnd(pCommitter);
H
Hongze Cheng 已提交
1047
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1048

H
Hongze Cheng 已提交
1049
_exit:
H
Hongze Cheng 已提交
1050
  if (code) {
S
Shengliang Guan 已提交
1051
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1052
  } else {
S
Shengliang Guan 已提交
1053
    tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1054
  }
H
Hongze Cheng 已提交
1055
  return code;
H
Hongze Cheng 已提交
1056 1057
}

H
Hongze Cheng 已提交
1058
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1059 1060
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1061
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
1062

H
Hongze Cheng 已提交
1063 1064 1065 1066 1067
  if (eno) {
    code = eno;
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
1068
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1069 1070
  }

H
Hongze Cheng 已提交
1071
_exit:
H
Hongze Cheng 已提交
1072
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
1073
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
1074
  pCommitter->aTbDataP = NULL;
H
Hongze Cheng 已提交
1075
  if (code || eno) {
S
Shengliang Guan 已提交
1076
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1077
  } else {
S
Shengliang Guan 已提交
1078
    tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1079
  }
H
Hongze Cheng 已提交
1080 1081
  return code;
}
H
Hongze Cheng 已提交
1082

H
Hongze Cheng 已提交
1083
// ================================================================================
H
Hongze Cheng 已提交
1084

H
Hongze Cheng 已提交
1085 1086
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1087 1088
}

H
Hongze Cheng 已提交
1089
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1090
  int32_t code = 0;
H
Hongze Cheng 已提交
1091
  int32_t lino = 0;
H
Hongze Cheng 已提交
1092 1093 1094

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1095
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1112 1113
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1123
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1124 1125 1126 1127 1128
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1129 1130 1131
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1132

H
Hongze Cheng 已提交
1133
          code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
H
Hongze Cheng 已提交
1169
  if (code) {
S
Shengliang Guan 已提交
1170
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1171
              tstrerror(code));
H
Hongze Cheng 已提交
1172
  }
H
Hongze Cheng 已提交
1173 1174 1175
  return code;
}

H
Hongze Cheng 已提交
1176
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1177 1178 1179
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1180 1181 1182 1183 1184 1185 1186
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1187
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1188 1189

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1190
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1191 1192

    code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1193
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1194 1195 1196 1197 1198 1199 1200

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1201
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1202 1203 1204
      }
    }

H
Hongze Cheng 已提交
1205
    if (pBlockData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1206 1207
      code =
          tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1208
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1209 1210 1211
    }
  }

H
Hongze Cheng 已提交
1212
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1213
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1214

H
Hongze Cheng 已提交
1215 1216
_exit:
  if (code) {
S
Shengliang Guan 已提交
1217
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1218
              tstrerror(code));
H
Hongze Cheng 已提交
1219
  }
H
Hongze Cheng 已提交
1220 1221 1222
  return code;
}

H
Hongze Cheng 已提交
1223
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1224 1225 1226
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1227 1228 1229 1230 1231
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1232
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1233
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1244
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1245 1246 1247 1248 1249 1250 1251 1252 1253

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1254
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1255 1256

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1257
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1258 1259

      code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1260
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1261 1262 1263 1264 1265 1266 1267

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1268
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1269 1270 1271
        }
      }
    } else {
1272
      ASSERT(0 && "dup rows not allowed");
H
Hongze Cheng 已提交
1273 1274
    }

H
Hongze Cheng 已提交
1275
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1276
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1277
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1278 1279 1280 1281 1282
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1283
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1284 1285 1286 1287 1288 1289 1290 1291

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1292
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1293
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1294
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1295 1296 1297
    }
  }

H
Hongze Cheng 已提交
1298
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1299
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1300

H
Hongze Cheng 已提交
1301 1302
_exit:
  if (code) {
S
Shengliang Guan 已提交
1303
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1304
              tstrerror(code));
H
Hongze Cheng 已提交
1305
  }
H
Hongze Cheng 已提交
1306 1307 1308
  return code;
}

H
Hongze Cheng 已提交
1309
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1310 1311 1312
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1313 1314 1315 1316 1317
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1318 1319
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1320 1321 1322 1323
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1324 1325 1326
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1327
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1328 1329

      if (c < 0) {
H
Hongze Cheng 已提交
1330
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1331
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1332 1333 1334

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1335
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1336
        } else {
H
Hongze Cheng 已提交
1337
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1338 1339
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1340
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1341
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1342 1343 1344

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1345
      } else {
H
Hongze Cheng 已提交
1346
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1347
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1348 1349 1350

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1351
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1352
        } else {
H
Hongze Cheng 已提交
1353
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1354 1355 1356
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1357 1358 1359
      }
    }

H
Hongze Cheng 已提交
1360 1361
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1362
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1363 1364 1365

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1366
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1367
      } else {
H
Hongze Cheng 已提交
1368
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1369 1370
      }
    }
H
Hongze Cheng 已提交
1371 1372

    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
1373
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1374 1375 1376
  }

_exit:
H
Hongze Cheng 已提交
1377
  if (code) {
S
Shengliang Guan 已提交
1378
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1379
              tstrerror(code));
H
Hongze Cheng 已提交
1380
  }
H
Hongze Cheng 已提交
1381 1382 1383
  return code;
}

H
Hongze Cheng 已提交
1384
static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1385
  int32_t code = 0;
H
Hongze Cheng 已提交
1386
  int32_t lino = 0;
H
Hongze Cheng 已提交
1387

H
compare  
Hongze Cheng 已提交
1388
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1389 1390 1391
  SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
  if (pBuilder->suid || pBuilder->uid) {
    if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
H
Hongze Cheng 已提交
1392
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1393
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1394 1395

      tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
1396 1397 1398
    }
  }

H
Hongze Cheng 已提交
1399
  if (!pBuilder->suid && !pBuilder->uid) {
H
Hongze Cheng 已提交
1400 1401
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1402
    code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
H
Hongze Cheng 已提交
1403
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1404
  }
H
compare  
Hongze Cheng 已提交
1405 1406 1407 1408 1409 1410 1411 1412
#else
  SBlockData *pBData = &pCommitter->dWriter.bDatal;
  if (pBData->suid || pBData->uid) {
    if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);

      tBlockDataReset(pBData);
H
Hongze Cheng 已提交
1413 1414 1415
    }
  }

H
compare  
Hongze Cheng 已提交
1416
  if (!pBData->suid && !pBData->uid) {
H
Hongze Cheng 已提交
1417 1418
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
compare  
Hongze Cheng 已提交
1419 1420 1421
    TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
    code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1422
  }
H
compare  
Hongze Cheng 已提交
1423
#endif
H
Hongze Cheng 已提交
1424

H
Hongze Cheng 已提交
1425
_exit:
H
Hongze Cheng 已提交
1426
  if (code) {
S
Shengliang Guan 已提交
1427
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1428
              tstrerror(code));
H
Hongze Cheng 已提交
1429
  }
H
Hongze Cheng 已提交
1430 1431 1432 1433 1434
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1435
  int32_t lino = 0;
H
Hongze Cheng 已提交
1436 1437

  SBlockData *pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1438
  TABLEID     id = {.suid = pBData->suid, .uid = pBData->uid};
H
Hongze Cheng 已提交
1439

H
Hongze Cheng 已提交
1440
  code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
H
Hongze Cheng 已提交
1441
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1442

H
Hongze Cheng 已提交
1443 1444 1445
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);

H
compare  
Hongze Cheng 已提交
1446
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1447
    code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
H
Hongze Cheng 已提交
1448
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1449

H
Hongze Cheng 已提交
1450
    if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1451
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1452
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1453 1454 1455

      code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1456
    }
H
compare  
Hongze Cheng 已提交
1457 1458 1459 1460 1461 1462 1463 1464
#else
    code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                               pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1465
    }
H
compare  
Hongze Cheng 已提交
1466
#endif
H
Hongze Cheng 已提交
1467 1468
  }

H
Hongze Cheng 已提交
1469 1470
_exit:
  if (code) {
S
Shengliang Guan 已提交
1471
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1472
              tstrerror(code));
H
Hongze Cheng 已提交
1473
  }
H
Hongze Cheng 已提交
1474 1475 1476
  return code;
}

H
Hongze Cheng 已提交
1477
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1478
  int32_t code = 0;
H
Hongze Cheng 已提交
1479
  int32_t lino = 0;
H
Hongze Cheng 已提交
1480

H
Hongze Cheng 已提交
1481
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1482 1483 1484 1485
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1486
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1487

H
Hongze Cheng 已提交
1488
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1489 1490
    code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1491

H
Hongze Cheng 已提交
1492 1493
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
H
Hongze Cheng 已提交
1494
      if (pRowInfo->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
1495 1496 1497 1498 1499
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
        TSDB_CHECK_CODE(code, lino, _exit);
        pTSchema = pCommitter->skmRow.pTSchema;
      }

H
compare  
Hongze Cheng 已提交
1500
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1501
      code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
H
compare  
Hongze Cheng 已提交
1502 1503 1504
#else
      code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid);
#endif
H
Hongze Cheng 已提交
1505
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1506

H
Hongze Cheng 已提交
1507 1508
      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1509

H
Hongze Cheng 已提交
1510 1511 1512 1513
      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }
H
Hongze Cheng 已提交
1514

H
compare  
Hongze Cheng 已提交
1515
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1516
      if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1517
        code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1518
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1519 1520 1521

        code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1522
      }
H
compare  
Hongze Cheng 已提交
1523
#else
1524
      if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
H
compare  
Hongze Cheng 已提交
1525 1526 1527 1528 1529
        code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                                 pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
#endif
H
Hongze Cheng 已提交
1530 1531
    }
  } else {
H
Hongze Cheng 已提交
1532
    SBlockData *pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1533
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1534

H
Hongze Cheng 已提交
1535 1536
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
H
Hongze Cheng 已提交
1537
      if (pRowInfo->row.type == TSDBROW_ROW_FMT) {
H
Hongze Cheng 已提交
1538
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1539
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1540 1541
        pTSchema = pCommitter->skmRow.pTSchema;
      }
H
Hongze Cheng 已提交
1542

H
Hongze Cheng 已提交
1543 1544
      code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1545

H
Hongze Cheng 已提交
1546 1547
      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1548

H
Hongze Cheng 已提交
1549 1550 1551 1552
      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }
H
Hongze Cheng 已提交
1553

H
Hongze Cheng 已提交
1554
      if (pBData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1555 1556
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1557
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1558
      }
H
Hongze Cheng 已提交
1559 1560
    }

H
Hongze Cheng 已提交
1561 1562 1563 1564 1565 1566 1567 1568 1569
    if (pBData->nRow) {
      if (pBData->nRow > pCommitter->minRow) {
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
        code = tsdbAppendLastBlock(pCommitter);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
1570 1571 1572
    }
  }

H
Hongze Cheng 已提交
1573
_exit:
H
Hongze Cheng 已提交
1574
  if (code) {
S
Shengliang Guan 已提交
1575
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1576
              tstrerror(code));
H
Hongze Cheng 已提交
1577
  }
H
Hongze Cheng 已提交
1578 1579 1580
  return code;
}

H
Hongze Cheng 已提交
1581 1582
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1583
  int32_t lino = 0;
H
Hongze Cheng 已提交
1584

H
Hongze Cheng 已提交
1585
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1586
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1587
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1588 1589 1590
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1591

H
Hongze Cheng 已提交
1592
    code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1593
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1594 1595

    // start
H
Hongze Cheng 已提交
1596
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1597 1598

    // impl
H
Hongze Cheng 已提交
1599
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1600
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1601
    code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1602
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1603
    code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1604
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1605

H
Hongze Cheng 已提交
1606 1607
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1608
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1609

H
Hongze Cheng 已提交
1610
    /* handle remain table data */
H
Hongze Cheng 已提交
1611
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1612
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1613

H
Hongze Cheng 已提交
1614
    // end
H
Hongze Cheng 已提交
1615 1616
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1617
      code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
H
Hongze Cheng 已提交
1618
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1619 1620 1621

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1622
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1623 1624
      }
    }
H
Hongze Cheng 已提交
1625 1626
  }

H
Hongze Cheng 已提交
1627 1628 1629
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1630
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1631

H
compare  
Hongze Cheng 已提交
1632
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1633
  code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
compare  
Hongze Cheng 已提交
1634
#else
H
Hongze Cheng 已提交
1635 1636
  code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                           pCommitter->cmprAlg);
H
compare  
Hongze Cheng 已提交
1637
#endif
H
Hongze Cheng 已提交
1638
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1639

H
Hongze Cheng 已提交
1640 1641
_exit:
  if (code) {
S
Shengliang Guan 已提交
1642
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1643
              tstrerror(code));
H
Hongze Cheng 已提交
1644
  }
H
Hongze Cheng 已提交
1645 1646
  return code;
}
H
Hongze Cheng 已提交
1647 1648 1649 1650 1651

int32_t tsdbFinishCommit(STsdb *pTsdb) {
  int32_t    code = 0;
  int32_t    lino = 0;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
1652

H
Hongze Cheng 已提交
1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665
  // lock
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pTsdb->imem = NULL;

  // unlock
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1666
  if (pMemTable) {
H
Hongze Cheng 已提交
1667
    tsdbUnrefMemTable(pMemTable, NULL, true);
H
Hongze Cheng 已提交
1668
  }
H
Hongze Cheng 已提交
1669 1670 1671

_exit:
  if (code) {
S
Shengliang Guan 已提交
1672
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1673
  } else {
S
Shengliang Guan 已提交
1674
    tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1675
  }
H
Hongze Cheng 已提交
1676
  return code;
H
Hongze Cheng 已提交
1677
}
H
Hongze Cheng 已提交
1678

H
Hongze Cheng 已提交
1679 1680 1681 1682 1683 1684 1685 1686 1687
int32_t tsdbRollbackCommit(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
S
Shengliang Guan 已提交
1688
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1689
  } else {
S
Shengliang Guan 已提交
1690
    tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1691
  }
H
Hongze Cheng 已提交
1692 1693
  return code;
}