tsdbCommit.c 44.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
Hongze Cheng 已提交
20 21 22
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
23
  EDataIterT  type;
H
Hongze Cheng 已提交
24 25 26 27 28 29
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
30 31 32
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
33 34
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
35
    };  // stt file data iter
H
Hongze Cheng 已提交
36 37 38
  };
} SDataIter;

H
Hongze Cheng 已提交
39
typedef struct {
H
Hongze Cheng 已提交
40
  STsdb *pTsdb;
H
Hongze Cheng 已提交
41
  /* commit data */
H
Hongze Cheng 已提交
42
  int64_t commitID;
H
Hongze Cheng 已提交
43 44
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
45 46
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
47
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
48
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
49 50
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
51
  // --------------
H
Hongze Cheng 已提交
52
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
53 54 55
  int32_t commitFid;
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
56
  // commit file data
H
Hongze Cheng 已提交
57 58
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
59 60 61
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
62
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
63
    SBlockData    bData;
H
Hongze Cheng 已提交
64
  } dReader;
H
Hongze Cheng 已提交
65 66 67
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
68
    SDataIter  dataIter;
H
Hongze Cheng 已提交
69
    SDataIter  aDataIter[TSDB_MAX_STT_TRIGGER];
H
Hongze Cheng 已提交
70
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
71
  };
H
Hongze Cheng 已提交
72 73 74
  struct {
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
75
    SArray       *aSttBlk;    // SArray<SSttBlk>
H
Hongze Cheng 已提交
76
    SMapData      mBlock;     // SMapData<SDataBlk>
H
Hongze Cheng 已提交
77
    SBlockData    bData;
H
Hongze Cheng 已提交
78
    SBlockData    bDatal;
H
Hongze Cheng 已提交
79 80 81
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
82
  /* commit del */
H
Hongze Cheng 已提交
83 84
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
85 86 87
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
88
} SCommitter;
H
refact  
Hongze Cheng 已提交
89

H
Hongze Cheng 已提交
90 91 92 93 94
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
95 96
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
97
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
115

H
refact  
Hongze Cheng 已提交
116
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
117
  int32_t code = 0;
H
Hongze Cheng 已提交
118
  int32_t lino = 0;
H
Hongze Cheng 已提交
119

120 121
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
122 123
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
124
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
125

H
Hongze Cheng 已提交
126
  // lock
H
Hongze Cheng 已提交
127
  if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
128
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
129
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
130 131 132 133 134
  }

  pTsdb->mem = pMemTable;

  // unlock
H
Hongze Cheng 已提交
135
  if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
136
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
137
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
138 139
  }

H
Hongze Cheng 已提交
140 141 142 143
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
144
  return code;
H
Hongze Cheng 已提交
145 146
}

H
more  
Hongze Cheng 已提交
147
int32_t tsdbCommit(STsdb *pTsdb) {
148
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
149

H
more  
Hongze Cheng 已提交
150
  int32_t    code = 0;
H
Hongze Cheng 已提交
151
  int32_t    lino = 0;
H
Hongze Cheng 已提交
152 153 154 155
  SCommitter commith;
  SMemTable *pMemTable = pTsdb->mem;

  // check
H
Hongze Cheng 已提交
156
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
157
    taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
158
    pTsdb->mem = NULL;
H
Hongze Cheng 已提交
159 160 161
    taosThreadRwlockUnlock(&pTsdb->rwLock);

    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
162 163
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
164

H
more  
Hongze Cheng 已提交
165
  // start commit
H
more  
Hongze Cheng 已提交
166
  code = tsdbStartCommit(pTsdb, &commith);
H
Hongze Cheng 已提交
167
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
168

H
refact  
Hongze Cheng 已提交
169 170
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
171
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
172 173

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
174
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
175 176

  // end commit
H
more  
Hongze Cheng 已提交
177
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
178
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180
_exit:
H
Hongze Cheng 已提交
181 182 183 184
  if (code) {
    tsdbEndCommit(&commith, code);
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
refact  
Hongze Cheng 已提交
185 186 187
  return code;
}

H
Hongze Cheng 已提交
188
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
189
  int32_t    code = 0;
H
Hongze Cheng 已提交
190
  int32_t    lino = 0;
H
Hongze Cheng 已提交
191 192 193
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
194
  if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
195
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
196
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
197 198
  }

H
Hongze Cheng 已提交
199
  if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
H
Hongze Cheng 已提交
200
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
201
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
202 203
  }

H
Hongze Cheng 已提交
204
  if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
205
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
206
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
207
  }
H
Hongze Cheng 已提交
208

H
Hongze Cheng 已提交
209
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
210
  if (pDelFileR) {
H
Hongze Cheng 已提交
211
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
212
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
213

H
Hongze Cheng 已提交
214
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
215
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
216 217
  }

H
Hongze Cheng 已提交
218
  // prepare new
H
Hongze Cheng 已提交
219 220
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
221
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
222 223

_exit:
H
Hongze Cheng 已提交
224 225 226 227 228
  if (code) {
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
  }
H
Hongze Cheng 已提交
229 230 231
  return code;
}

H
Hongze Cheng 已提交
232
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
233
  int32_t   code = 0;
H
Hongze Cheng 已提交
234
  int32_t   lino = 0;
H
Hongze Cheng 已提交
235
  SDelData *pDelData;
H
Hongze Cheng 已提交
236 237
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
238 239

  if (pTbData) {
H
Hongze Cheng 已提交
240 241
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
242

H
Hongze Cheng 已提交
243 244 245 246
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
247 248

  if (pDelIdx) {
H
Hongze Cheng 已提交
249 250 251
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
252
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
253
    TSDB_CHECK_CODE(code, lino, _exit);
254 255
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
256 257
  }

H
Hongze Cheng 已提交
258
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
261 262

  // memory
H
Hongze Cheng 已提交
263 264
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
265 266
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
267
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
268
    }
H
Hongze Cheng 已提交
269 270 271
  }

  // write
H
Hongze Cheng 已提交
272
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
273
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
274 275

  // put delIdx
276
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
277
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
278
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
279
  }
H
Hongze Cheng 已提交
280 281

_exit:
H
Hongze Cheng 已提交
282 283 284
  if (code) {
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
285 286 287
  return code;
}

H
Hongze Cheng 已提交
288 289
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
290
  int32_t lino = 0;
H
Hongze Cheng 已提交
291
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
292

H
Hongze Cheng 已提交
293
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
294
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
295

H
Hongze Cheng 已提交
296
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
H
Hongze Cheng 已提交
297
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
298

H
Hongze Cheng 已提交
299
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
300
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
301

H
Hongze Cheng 已提交
302
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
303
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
304 305

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
306
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
307
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
308 309
  }

H
Hongze Cheng 已提交
310 311 312 313
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
314 315 316 317
_exit:
  if (code) {
    tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
318 319 320
  return code;
}

H
Hongze Cheng 已提交
321
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
322
  int32_t code = 0;
H
Hongze Cheng 已提交
323
  int32_t lino = 0;
H
Hongze Cheng 已提交
324

H
Hongze Cheng 已提交
325
  if (suid) {
H
Hongze Cheng 已提交
326 327
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
328 329
      goto _exit;
    }
H
Hongze Cheng 已提交
330
  } else {
H
Hongze Cheng 已提交
331
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
332 333
  }

H
Hongze Cheng 已提交
334 335 336 337
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
  tTSchemaDestroy(pSkmInfo->pTSchema);
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
338
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
339 340 341 342 343 344 345

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;
H
Hongze Cheng 已提交
346
  int32_t lino = 0;
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
362
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
363 364 365 366 367

_exit:
  return code;
}

H
Hongze Cheng 已提交
368 369
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
370
  int32_t lino = 0;
H
Hongze Cheng 已提交
371 372 373 374 375 376 377 378

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
379
    code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
380
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
381 382 383 384 385 386 387 388 389 390

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
391 392
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
393
  int32_t lino = 0;
H
Hongze Cheng 已提交
394 395

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
396
  tRBTreeCreate(&pCommitter->rbt, tRowInfoCmprFn);
H
Hongze Cheng 已提交
397 398

  // memory
H
Hongze Cheng 已提交
399
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
400 401
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
402 403 404 405 406 407 408
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
409
      pRow = NULL;
H
Hongze Cheng 已提交
410 411
    }

H
Hongze Cheng 已提交
412 413
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
414 415 416 417 418
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
419
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
420 421 422
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
423
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
424
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
425
  if (pReader) {
H
Hongze Cheng 已提交
426
    if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
427
      int8_t iIter = 0;
H
Hongze Cheng 已提交
428
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
429
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
430
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
431
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
432

H
Hongze Cheng 已提交
433
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
434
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
435

H
Hongze Cheng 已提交
436
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
437

H
Hongze Cheng 已提交
438 439 440
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
        code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
441
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
442 443 444 445 446 447 448 449 450

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
451
    } else {
H
Hongze Cheng 已提交
452 453 454
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
455 456 457 458
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
459
    }
H
Hongze Cheng 已提交
460 461 462
  }

  code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
463
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
464

H
Hongze Cheng 已提交
465 466 467 468
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
469 470 471
  return code;
}

H
Hongze Cheng 已提交
472 473
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
474
  int32_t    lino = 0;
H
Hongze Cheng 已提交
475 476
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
477

H
Hongze Cheng 已提交
478
  // memory
H
Hongze Cheng 已提交
479 480 481
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
H
Hongze Cheng 已提交
482
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
483

H
Hongze Cheng 已提交
484
  // Reader
H
Hongze Cheng 已提交
485 486
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
487
  if (pRSet) {
H
Hongze Cheng 已提交
488
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
489
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
490

H
Hongze Cheng 已提交
491
    // data
H
Hongze Cheng 已提交
492
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
493
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
494

H
Hongze Cheng 已提交
495
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
496 497
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
498
      code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
499
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
500 501 502
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
503
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
504
  } else {
H
Hongze Cheng 已提交
505
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
506
  }
H
Hongze Cheng 已提交
507

H
Hongze Cheng 已提交
508
  // Writer
H
Hongze Cheng 已提交
509 510 511
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
512
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
513
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
514
  if (pRSet) {
H
Hongze Cheng 已提交
515
    ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
H
Hongze Cheng 已提交
516 517
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
518
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
519
    if (pRSet->nSttF < pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
520 521
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
522
      }
H
Hongze Cheng 已提交
523
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
524
    } else {
H
Hongze Cheng 已提交
525
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
526
    }
H
Hongze Cheng 已提交
527
  } else {
H
Hongze Cheng 已提交
528
    SDiskID did = {0};
529
    tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
H
Hongze Cheng 已提交
530
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
531
    wSet.diskId = did;
H
Hongze Cheng 已提交
532
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
533
  }
H
Hongze Cheng 已提交
534
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
535
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
536
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
537

H
Hongze Cheng 已提交
538
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
539
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
540 541 542 543
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
  tBlockDataReset(&pCommitter->dWriter.bDatal);

H
Hongze Cheng 已提交
544 545
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
H
Hongze Cheng 已提交
546
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
547

H
Hongze Cheng 已提交
548
_exit:
H
Hongze Cheng 已提交
549 550 551
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
552
  return code;
H
Hongze Cheng 已提交
553 554
}

H
Hongze Cheng 已提交
555 556
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
557
  int32_t lino = 0;
H
Hongze Cheng 已提交
558

H
Hongze Cheng 已提交
559
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
560

H
Hongze Cheng 已提交
561
  SDataBlk dataBlk;
H
Hongze Cheng 已提交
562
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564
  // info
H
Hongze Cheng 已提交
565
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
566 567
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
568

H
Hongze Cheng 已提交
569
    if (iRow == 0) {
H
Hongze Cheng 已提交
570 571
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
572 573 574
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
575
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
576 577 578
      }
    }

H
Hongze Cheng 已提交
579 580
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
581 582
    }

H
Hongze Cheng 已提交
583 584
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
585 586 587
  }

  // write
H
Hongze Cheng 已提交
588
  dataBlk.nSubBlock++;
H
Hongze Cheng 已提交
589 590
  code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
H
Hongze Cheng 已提交
591
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
592

H
Hongze Cheng 已提交
593
  // put SDataBlk
H
Hongze Cheng 已提交
594
  code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
595
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
596

H
Hongze Cheng 已提交
597
  // clear
H
Hongze Cheng 已提交
598
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600 601 602 603
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
604 605 606
  return code;
}

H
Hongze Cheng 已提交
607 608
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
609
  int32_t lino = 0;
H
Hongze Cheng 已提交
610
  SSttBlk sstBlk;
H
Hongze Cheng 已提交
611

H
Hongze Cheng 已提交
612
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
613

H
Hongze Cheng 已提交
614
  // info
H
Hongze Cheng 已提交
615 616 617 618 619 620
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
621
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
622 623 624 625
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
626
  }
H
Hongze Cheng 已提交
627 628
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
629

H
Hongze Cheng 已提交
630
  // write
H
Hongze Cheng 已提交
631
  code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
H
Hongze Cheng 已提交
632
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
633

H
Hongze Cheng 已提交
634
  // push SSttBlk
H
Hongze Cheng 已提交
635
  if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
636
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
637
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
638 639
  }

H
Hongze Cheng 已提交
640
  // clear
H
Hongze Cheng 已提交
641
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
642

H
Hongze Cheng 已提交
643 644 645 646
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
647 648 649
  return code;
}

H
Hongze Cheng 已提交
650 651
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
652
  int32_t lino = 0;
H
Hongze Cheng 已提交
653

H
Hongze Cheng 已提交
654
  // write aBlockIdx
H
Hongze Cheng 已提交
655
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
656
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
657

H
Hongze Cheng 已提交
658 659
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
660
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
661

H
Hongze Cheng 已提交
662
  // update file header
H
Hongze Cheng 已提交
663
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
664
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
665 666

  // upsert SDFileSet
H
Hongze Cheng 已提交
667
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
668
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
669 670

  // close and sync
H
Hongze Cheng 已提交
671
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
672
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
673

H
Hongze Cheng 已提交
674 675
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
676
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
677 678 679
  }

_exit:
H
Hongze Cheng 已提交
680 681 682
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
683 684 685
  return code;
}

H
Hongze Cheng 已提交
686 687
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;
H
Hongze Cheng 已提交
688
  int32_t lino = 0;
H
Hongze Cheng 已提交
689

H
Hongze Cheng 已提交
690
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
691
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
692
    code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
693
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
694 695 696

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
697
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
698 699
    }

H
Hongze Cheng 已提交
700
    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
701
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
702 703
  }

H
Hongze Cheng 已提交
704 705 706 707
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
708 709 710
  return code;
}

H
Hongze Cheng 已提交
711
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
712
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
713
  int32_t    code = 0;
H
Hongze Cheng 已提交
714
  int32_t    lino = 0;
H
Hongze Cheng 已提交
715 716
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
717 718 719

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
720
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
721

H
Hongze Cheng 已提交
722 723
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
724
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
725

H
Hongze Cheng 已提交
726 727
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
728
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
729

H
Hongze Cheng 已提交
730 731 732 733 734 735
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
    tsdbDataFReaderClose(&pCommitter->dReader.pReader);
    tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
  }
H
Hongze Cheng 已提交
736 737 738
  return code;
}

H
Hongze Cheng 已提交
739 740
// ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
H
Hongze Cheng 已提交
741
  int32_t code = 0;
H
Hongze Cheng 已提交
742
  int32_t lino = 0;
H
Hongze Cheng 已提交
743

H
Hongze Cheng 已提交
744 745
  memset(pCommitter, 0, sizeof(*pCommitter));
  ASSERT(pTsdb->mem && pTsdb->imem == NULL);
H
Hongze Cheng 已提交
746

H
more  
Hongze Cheng 已提交
747
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
748 749
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
more  
Hongze Cheng 已提交
750
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
751

H
Hongze Cheng 已提交
752
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
753
  pCommitter->commitID = pTsdb->pVnode->state.commitID;
H
Hongze Cheng 已提交
754 755 756 757
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
  pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
H
Hongze Cheng 已提交
758
  pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
759
  pCommitter->sttTrigger = pTsdb->pVnode->config.sttTrigger;
H
Hongze Cheng 已提交
760 761 762
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
763
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
764
  }
H
Hongze Cheng 已提交
765
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
766
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
767

H
Hongze Cheng 已提交
768 769 770 771
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
772 773 774
  return code;
}

H
Hongze Cheng 已提交
775 776
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
777
  int32_t lino = 0;
H
Hongze Cheng 已提交
778

H
Hongze Cheng 已提交
779
  // reader
H
Hongze Cheng 已提交
780 781
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
782
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
783
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
784 785
  }

H
Hongze Cheng 已提交
786
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
787
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
788

H
Hongze Cheng 已提交
789
  // merger
H
Hongze Cheng 已提交
790
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
791 792 793
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
794
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
795
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
796 797 798
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
799
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
800 801 802
  }

  // writer
H
Hongze Cheng 已提交
803 804 805
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
806
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
807 808
  }

H
Hongze Cheng 已提交
809 810
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
811
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
812
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
813 814
  }

H
Hongze Cheng 已提交
815
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
816
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
817

H
Hongze Cheng 已提交
818
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
H
Hongze Cheng 已提交
819
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
820

H
Hongze Cheng 已提交
821
_exit:
H
Hongze Cheng 已提交
822 823 824
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
825 826 827 828
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
829
  // reader
H
Hongze Cheng 已提交
830 831
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
832
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
833

H
Hongze Cheng 已提交
834
  // merger
H
Hongze Cheng 已提交
835
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
836 837
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
838 839 840 841
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
842
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
843
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
844
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
845 846
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
H
Hongze Cheng 已提交
847 848
  tTSchemaDestroy(pCommitter->skmTable.pTSchema);
  tTSchemaDestroy(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
849 850
}

H
Hongze Cheng 已提交
851
static int32_t tsdbCommitData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
852 853 854
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
855 856
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858
  // check
H
Hongze Cheng 已提交
859
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
860

H
Hongze Cheng 已提交
861 862
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
863
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
864 865 866

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
867 868
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
869
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
870
  }
H
Hongze Cheng 已提交
871

H
Hongze Cheng 已提交
872 873 874
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
875
_exit:
H
Hongze Cheng 已提交
876 877 878
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
879 880
  return code;
}
H
Hongze Cheng 已提交
881

H
Hongze Cheng 已提交
882
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
883 884 885
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
886 887
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
888

H
Hongze Cheng 已提交
889 890
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
891
  }
H
Hongze Cheng 已提交
892

H
Hongze Cheng 已提交
893 894 895
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
H
Hongze Cheng 已提交
896
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
897
  }
H
Hongze Cheng 已提交
898

H
Hongze Cheng 已提交
899
  // impl
H
Hongze Cheng 已提交
900 901 902
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
903
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
904 905 906 907 908
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
909
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
H
Hongze Cheng 已提交
932
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
933 934

    iTbData++;
H
Hongze Cheng 已提交
935
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
936 937 938 939
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
H
Hongze Cheng 已提交
940
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
941 942 943 944 945 946 947

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
948
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
949 950

    iTbData++;
H
Hongze Cheng 已提交
951
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
952 953 954
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
955
  }
H
Hongze Cheng 已提交
956

H
Hongze Cheng 已提交
957 958
  // end
  code = tsdbCommitDelEnd(pCommitter);
H
Hongze Cheng 已提交
959
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
960

H
Hongze Cheng 已提交
961
_exit:
H
Hongze Cheng 已提交
962 963 964 965 966
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
  }
H
Hongze Cheng 已提交
967
  return code;
H
Hongze Cheng 已提交
968 969
}

H
Hongze Cheng 已提交
970
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
971 972 973
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
974 975 976
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
977
  ASSERT(eno == 0);
H
Hongze Cheng 已提交
978 979

  code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
980
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
981

H
Hongze Cheng 已提交
982
  // lock
H
Hongze Cheng 已提交
983
  taosThreadRwlockWrlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
984 985 986 987 988

  // commit or rollback
  code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
989
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
990 991
  }

H
Hongze Cheng 已提交
992
  pTsdb->imem = NULL;
H
Hongze Cheng 已提交
993 994

  // unlock
H
Hongze Cheng 已提交
995 996 997
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
998
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
999
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
1000

H
Hongze Cheng 已提交
1001 1002 1003 1004 1005 1006
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
  }
H
Hongze Cheng 已提交
1007 1008
  return code;
}
H
Hongze Cheng 已提交
1009

H
Hongze Cheng 已提交
1010
// ================================================================================
H
Hongze Cheng 已提交
1011

H
Hongze Cheng 已提交
1012 1013
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1014 1015
}

H
Hongze Cheng 已提交
1016
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1017
  int32_t code = 0;
H
Hongze Cheng 已提交
1018
  int32_t lino = 0;
H
Hongze Cheng 已提交
1019 1020 1021

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1022
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1039 1040
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1050
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1051 1052 1053 1054 1055
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1056 1057 1058
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1059

H
Hongze Cheng 已提交
1060
          code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1061
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
H
Hongze Cheng 已提交
1096 1097 1098
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1099 1100 1101
  return code;
}

H
Hongze Cheng 已提交
1102
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1103 1104 1105
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1106 1107 1108 1109 1110 1111 1112 1113
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    ASSERT(pRowInfo->row.type == 0);
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1114
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1115 1116

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1117
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1118 1119

    code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1120
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1121 1122 1123 1124 1125 1126 1127

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1128
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1129 1130 1131
      }
    }

H
Hongze Cheng 已提交
1132
    if (pBlockData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1133 1134
      code =
          tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1135
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1136 1137 1138
    }
  }

H
Hongze Cheng 已提交
1139
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1140
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1141

H
Hongze Cheng 已提交
1142 1143 1144 1145
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1146 1147 1148
  return code;
}

H
Hongze Cheng 已提交
1149
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1150 1151 1152
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1153 1154 1155 1156 1157
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1158
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1159
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1170
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      ASSERT(pRowInfo->row.type == 0);
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1181
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1182 1183

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1184
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1185 1186

      code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1187
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1188 1189 1190 1191 1192 1193 1194

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1195
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1196 1197 1198 1199 1200 1201
        }
      }
    } else {
      ASSERT(0);
    }

H
Hongze Cheng 已提交
1202
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1203
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1204
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1205 1206 1207 1208 1209
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1210
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1211 1212 1213 1214 1215 1216 1217 1218

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1219
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1220
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1221
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1222 1223 1224
    }
  }

H
Hongze Cheng 已提交
1225
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1226
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1227

H
Hongze Cheng 已提交
1228 1229 1230 1231
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1232 1233 1234
  return code;
}

H
Hongze Cheng 已提交
1235
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1236 1237 1238
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1239 1240 1241 1242 1243
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1244 1245
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1246 1247 1248 1249
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1250 1251 1252
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1253
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1254 1255

      if (c < 0) {
H
Hongze Cheng 已提交
1256
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1257
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1258 1259 1260

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1261
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1262
        } else {
H
Hongze Cheng 已提交
1263
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1264 1265
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1266
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1267
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1268 1269 1270

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1271
      } else {
H
Hongze Cheng 已提交
1272
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1273
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1274 1275 1276

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1277
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1278
        } else {
H
Hongze Cheng 已提交
1279
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1280 1281 1282
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1283 1284 1285
      }
    }

H
Hongze Cheng 已提交
1286 1287
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1288
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1289 1290 1291

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1292
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1293
      } else {
H
Hongze Cheng 已提交
1294
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1295 1296
      }
    }
H
Hongze Cheng 已提交
1297 1298

    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
1299
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1300 1301 1302
  }

_exit:
H
Hongze Cheng 已提交
1303 1304 1305
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1306 1307 1308
  return code;
}

H
Hongze Cheng 已提交
1309
static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1310
  int32_t code = 0;
H
Hongze Cheng 已提交
1311
  int32_t lino = 0;
H
Hongze Cheng 已提交
1312 1313 1314

  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
  if (pBDatal->suid || pBDatal->uid) {
H
Hongze Cheng 已提交
1315
    if ((pBDatal->suid != id.suid) || (id.suid == 0)) {
H
Hongze Cheng 已提交
1316
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1317
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1318 1319 1320 1321 1322
      tBlockDataReset(pBDatal);
    }
  }

  if (!pBDatal->suid && !pBDatal->uid) {
H
Hongze Cheng 已提交
1323 1324
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1325
    code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1326
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1327 1328
  }

H
Hongze Cheng 已提交
1329
_exit:
H
Hongze Cheng 已提交
1330 1331 1332
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1333 1334 1335 1336 1337
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1338
  int32_t lino = 0;
H
Hongze Cheng 已提交
1339 1340 1341 1342 1343 1344

  SBlockData *pBData = &pCommitter->dWriter.bData;
  SBlockData *pBDatal = &pCommitter->dWriter.bDatal;

  TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
  code = tsdbInitLastBlockIfNeed(pCommitter, id);
H
Hongze Cheng 已提交
1345
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1346

H
Hongze Cheng 已提交
1347 1348 1349
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
    code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
H
Hongze Cheng 已提交
1350
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1351 1352

    if (pBDatal->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1353
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1354
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1355 1356 1357
    }
  }

H
Hongze Cheng 已提交
1358 1359 1360 1361
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1362 1363 1364
  return code;
}

H
Hongze Cheng 已提交
1365
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1366
  int32_t code = 0;
H
Hongze Cheng 已提交
1367
  int32_t lino = 0;
H
Hongze Cheng 已提交
1368

H
Hongze Cheng 已提交
1369
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1370 1371 1372 1373
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1374
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1375

H
Hongze Cheng 已提交
1376
  SBlockData *pBData;
H
Hongze Cheng 已提交
1377
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1378
    pBData = &pCommitter->dWriter.bDatal;
H
Hongze Cheng 已提交
1379
    code = tsdbInitLastBlockIfNeed(pCommitter, id);
H
Hongze Cheng 已提交
1380
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1381
  } else {
H
Hongze Cheng 已提交
1382
    pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1383
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1384
  }
H
Hongze Cheng 已提交
1385

H
Hongze Cheng 已提交
1386 1387 1388 1389
  while (pRowInfo) {
    STSchema *pTSchema = NULL;
    if (pRowInfo->row.type == 0) {
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1390
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1391 1392
      pTSchema = pCommitter->skmRow.pTSchema;
    }
H
Hongze Cheng 已提交
1393

H
Hongze Cheng 已提交
1394
    code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
H
Hongze Cheng 已提交
1395
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1396

H
Hongze Cheng 已提交
1397
    code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1398
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1399

H
Hongze Cheng 已提交
1400 1401 1402
    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
      pRowInfo = NULL;
H
Hongze Cheng 已提交
1403 1404
    }

H
Hongze Cheng 已提交
1405 1406
    if (pBData->nRow >= pCommitter->maxRow) {
      if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1407
        code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1408
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1409
      } else {
H
Hongze Cheng 已提交
1410 1411
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1412
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1413
      }
H
Hongze Cheng 已提交
1414 1415 1416
    }
  }

H
Hongze Cheng 已提交
1417 1418
  if (!pCommitter->toLastOnly && pBData->nRow) {
    if (pBData->nRow > pCommitter->minRow) {
H
Hongze Cheng 已提交
1419
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1420
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1421 1422
    } else {
      code = tsdbAppendLastBlock(pCommitter);
H
Hongze Cheng 已提交
1423
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1424 1425 1426
    }
  }

H
Hongze Cheng 已提交
1427
_exit:
H
Hongze Cheng 已提交
1428 1429 1430
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1431 1432 1433
  return code;
}

H
Hongze Cheng 已提交
1434 1435
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1436
  int32_t lino = 0;
H
Hongze Cheng 已提交
1437

H
Hongze Cheng 已提交
1438
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1439
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1440
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1441 1442 1443
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1444

H
Hongze Cheng 已提交
1445
    code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1446
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1447 1448

    // start
H
Hongze Cheng 已提交
1449
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1450 1451

    // impl
H
Hongze Cheng 已提交
1452
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1453
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1454
    code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1455
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1456
    code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
H
Hongze Cheng 已提交
1457
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1458

H
Hongze Cheng 已提交
1459 1460
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1461
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1462

H
Hongze Cheng 已提交
1463
    /* handle remain table data */
H
Hongze Cheng 已提交
1464
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1465
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1466

H
Hongze Cheng 已提交
1467
    // end
H
Hongze Cheng 已提交
1468 1469
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1470
      code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
H
Hongze Cheng 已提交
1471
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1472 1473 1474

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1475
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1476 1477
      }
    }
H
Hongze Cheng 已提交
1478 1479
  }

H
Hongze Cheng 已提交
1480 1481 1482
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1483
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1484

H
Hongze Cheng 已提交
1485 1486
  code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                           pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1487
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1488

H
Hongze Cheng 已提交
1489 1490 1491 1492
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
H
Hongze Cheng 已提交
1493 1494
  return code;
}