tsdbCommit.c 51.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
H
Hongze Cheng 已提交
19

H
more  
Hongze Cheng 已提交
20
#define USE_STREAM_COMPRESSION 0
H
compare  
Hongze Cheng 已提交
21

H
Hongze Cheng 已提交
22 23 24
typedef struct {
  SRBTreeNode n;
  SRowInfo    r;
H
Hongze Cheng 已提交
25
  EDataIterT  type;
H
Hongze Cheng 已提交
26 27 28 29 30 31
  union {
    struct {
      int32_t     iTbDataP;
      STbDataIter iter;
    };  // memory data iter
    struct {
H
Hongze Cheng 已提交
32 33 34
      int32_t    iStt;
      SArray    *aSttBlk;
      int32_t    iSttBlk;
H
Hongze Cheng 已提交
35 36
      SBlockData bData;
      int32_t    iRow;
H
Hongze Cheng 已提交
37
    };  // stt file data iter
H
Hongze Cheng 已提交
38 39 40
  };
} SDataIter;

H
Hongze Cheng 已提交
41
typedef struct {
H
Hongze Cheng 已提交
42
  STsdb *pTsdb;
H
Hongze Cheng 已提交
43
  /* commit data */
H
Hongze Cheng 已提交
44
  int64_t commitID;
H
Hongze Cheng 已提交
45 46
  int32_t minutes;
  int8_t  precision;
H
Hongze Cheng 已提交
47 48
  int32_t minRow;
  int32_t maxRow;
H
Hongze Cheng 已提交
49
  int8_t  cmprAlg;
H
Hongze Cheng 已提交
50
  int8_t  sttTrigger;
H
Hongze Cheng 已提交
51 52
  SArray *aTbDataP;  // memory
  STsdbFS fs;        // disk
H
Hongze Cheng 已提交
53
  // --------------
H
Hongze Cheng 已提交
54
  TSKEY   nextKey;  // reset by each table commit
H
Hongze Cheng 已提交
55
  int32_t commitFid;
H
Hongze Cheng 已提交
56
  int32_t expLevel;
H
Hongze Cheng 已提交
57 58
  TSKEY   minKey;
  TSKEY   maxKey;
H
Hongze Cheng 已提交
59
  // commit file data
H
Hongze Cheng 已提交
60 61
  struct {
    SDataFReader *pReader;
H
Hongze Cheng 已提交
62 63 64
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    int32_t       iBlockIdx;
    SBlockIdx    *pBlockIdx;
H
Hongze Cheng 已提交
65
    SMapData      mBlock;  // SMapData<SDataBlk>
H
Hongze Cheng 已提交
66
    SBlockData    bData;
H
Hongze Cheng 已提交
67
  } dReader;
H
Hongze Cheng 已提交
68 69 70
  struct {
    SDataIter *pIter;
    SRBTree    rbt;
H
Hongze Cheng 已提交
71
    SDataIter  dataIter;
H
Hongze Cheng 已提交
72
    SDataIter  aDataIter[TSDB_MAX_STT_TRIGGER];
H
Hongze Cheng 已提交
73
    int8_t     toLastOnly;
H
Hongze Cheng 已提交
74
  };
H
Hongze Cheng 已提交
75
  struct {
H
compare  
Hongze Cheng 已提交
76 77 78 79 80 81
    SDataFWriter *pWriter;
    SArray       *aBlockIdx;  // SArray<SBlockIdx>
    SArray       *aSttBlk;    // SArray<SSttBlk>
    SMapData      mBlock;     // SMapData<SDataBlk>
    SBlockData    bData;
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
82
    SDiskDataBuilder *pBuilder;
H
compare  
Hongze Cheng 已提交
83 84 85
#else
    SBlockData bDatal;
#endif
H
Hongze Cheng 已提交
86 87 88
  } dWriter;
  SSkmInfo skmTable;
  SSkmInfo skmRow;
H
Hongze Cheng 已提交
89
  /* commit del */
H
Hongze Cheng 已提交
90 91
  SDelFReader *pDelFReader;
  SDelFWriter *pDelFWriter;
H
Hongze Cheng 已提交
92 93 94
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelIdxN;  // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
H
Hongze Cheng 已提交
95
} SCommitter;
H
refact  
Hongze Cheng 已提交
96

H
Hongze Cheng 已提交
97
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo);
H
Hongze Cheng 已提交
98 99 100 101
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
H
Hongze Cheng 已提交
102 103
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);

H
Hongze Cheng 已提交
104
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  SRowInfo *pInfo1 = (SRowInfo *)p1;
  SRowInfo *pInfo2 = (SRowInfo *)p2;

  if (pInfo1->suid < pInfo2->suid) {
    return -1;
  } else if (pInfo1->suid > pInfo2->suid) {
    return 1;
  }

  if (pInfo1->uid < pInfo2->uid) {
    return -1;
  } else if (pInfo1->uid > pInfo2->uid) {
    return 1;
  }

  return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
H
refact  
Hongze Cheng 已提交
122

H
refact  
Hongze Cheng 已提交
123
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
124
  int32_t code = 0;
H
Hongze Cheng 已提交
125
  int32_t lino = 0;
H
Hongze Cheng 已提交
126

127 128
  if (!pTsdb) return code;

H
Hongze Cheng 已提交
129 130
  SMemTable *pMemTable;
  code = tsdbMemTableCreate(pTsdb, &pMemTable);
H
Hongze Cheng 已提交
131
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
132

H
Hongze Cheng 已提交
133
  // lock
H
Hongze Cheng 已提交
134
  if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
135
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
136
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
137 138 139 140 141
  }

  pTsdb->mem = pMemTable;

  // unlock
H
Hongze Cheng 已提交
142
  if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
H
Hongze Cheng 已提交
143
    code = TAOS_SYSTEM_ERROR(code);
H
Hongze Cheng 已提交
144
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
145 146
  }

H
Hongze Cheng 已提交
147 148
_exit:
  if (code) {
S
Shengliang Guan 已提交
149
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
150
  }
H
Hongze Cheng 已提交
151
  return code;
H
Hongze Cheng 已提交
152 153
}

H
Hongze Cheng 已提交
154 155 156 157 158 159 160 161 162 163
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
  taosThreadRwlockWrlock(&pTsdb->rwLock);
  ASSERT(pTsdb->imem == NULL);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
  taosThreadRwlockUnlock(&pTsdb->rwLock);

  return 0;
}

H
Hongze Cheng 已提交
164
int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
165
  if (!pTsdb) return 0;
H
Hongze Cheng 已提交
166

H
more  
Hongze Cheng 已提交
167
  int32_t    code = 0;
H
Hongze Cheng 已提交
168
  int32_t    lino = 0;
H
Hongze Cheng 已提交
169
  SCommitter commith;
H
Hongze Cheng 已提交
170
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
171 172

  // check
H
Hongze Cheng 已提交
173
  if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
H
Hongze Cheng 已提交
174 175 176 177
    taosThreadRwlockWrlock(&pTsdb->rwLock);
    pTsdb->imem = NULL;
    taosThreadRwlockUnlock(&pTsdb->rwLock);

H
Hongze Cheng 已提交
178
    tsdbUnrefMemTable(pMemTable);
H
Hongze Cheng 已提交
179 180
    goto _exit;
  }
H
refact  
Hongze Cheng 已提交
181

H
more  
Hongze Cheng 已提交
182
  // start commit
H
Hongze Cheng 已提交
183
  code = tsdbStartCommit(pTsdb, &commith, pInfo);
H
Hongze Cheng 已提交
184
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
185

H
refact  
Hongze Cheng 已提交
186 187
  // commit impl
  code = tsdbCommitData(&commith);
H
Hongze Cheng 已提交
188
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
189 190

  code = tsdbCommitDel(&commith);
H
Hongze Cheng 已提交
191
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
192 193

  // end commit
H
more  
Hongze Cheng 已提交
194
  code = tsdbEndCommit(&commith, 0);
H
Hongze Cheng 已提交
195
  TSDB_CHECK_CODE(code, lino, _exit);
H
refact  
Hongze Cheng 已提交
196

H
Hongze Cheng 已提交
197
_exit:
H
Hongze Cheng 已提交
198 199
  if (code) {
    tsdbEndCommit(&commith, code);
S
Shengliang Guan 已提交
200
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
201
  }
H
refact  
Hongze Cheng 已提交
202 203 204
  return code;
}

H
Hongze Cheng 已提交
205
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
206
  int32_t    code = 0;
H
Hongze Cheng 已提交
207
  int32_t    lino = 0;
H
Hongze Cheng 已提交
208 209 210
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;

H
Hongze Cheng 已提交
211
  if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
212
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
213
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
214 215
  }

H
Hongze Cheng 已提交
216
  if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
H
Hongze Cheng 已提交
217
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
218
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
219 220
  }

H
Hongze Cheng 已提交
221
  if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
222
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
223
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
224
  }
H
Hongze Cheng 已提交
225

H
Hongze Cheng 已提交
226
  SDelFile *pDelFileR = pCommitter->fs.pDelFile;
H
Hongze Cheng 已提交
227
  if (pDelFileR) {
H
Hongze Cheng 已提交
228
    code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
H
Hongze Cheng 已提交
229
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231
    code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
H
Hongze Cheng 已提交
232
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
233 234
  }

H
Hongze Cheng 已提交
235
  // prepare new
H
Hongze Cheng 已提交
236 237
  SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
  code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
H
Hongze Cheng 已提交
238
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
239 240

_exit:
H
Hongze Cheng 已提交
241
  if (code) {
H
Hongze Cheng 已提交
242
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
243 244 245
  } else {
    tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
  }
H
Hongze Cheng 已提交
246 247 248
  return code;
}

H
Hongze Cheng 已提交
249
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
H
Hongze Cheng 已提交
250
  int32_t   code = 0;
H
Hongze Cheng 已提交
251
  int32_t   lino = 0;
H
Hongze Cheng 已提交
252
  SDelData *pDelData;
H
Hongze Cheng 已提交
253 254
  tb_uid_t  suid;
  tb_uid_t  uid;
H
Hongze Cheng 已提交
255 256

  if (pTbData) {
H
Hongze Cheng 已提交
257 258
    suid = pTbData->suid;
    uid = pTbData->uid;
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260 261 262 263
    if (pTbData->pHead == NULL) {
      pTbData = NULL;
    }
  }
H
Hongze Cheng 已提交
264 265

  if (pDelIdx) {
H
Hongze Cheng 已提交
266 267 268
    suid = pDelIdx->suid;
    uid = pDelIdx->uid;

H
Hongze Cheng 已提交
269
    code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
H
Hongze Cheng 已提交
270
    TSDB_CHECK_CODE(code, lino, _exit);
271 272
  } else {
    taosArrayClear(pCommitter->aDelData);
H
Hongze Cheng 已提交
273 274
  }

H
Hongze Cheng 已提交
275
  if (pTbData == NULL && pDelIdx == NULL) goto _exit;
H
Hongze Cheng 已提交
276

H
Hongze Cheng 已提交
277
  SDelIdx delIdx = {.suid = suid, .uid = uid};
H
Hongze Cheng 已提交
278 279

  // memory
H
Hongze Cheng 已提交
280 281
  pDelData = pTbData ? pTbData->pHead : NULL;
  for (; pDelData; pDelData = pDelData->pNext) {
H
Hongze Cheng 已提交
282 283
    if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
284
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
285
    }
H
Hongze Cheng 已提交
286 287 288
  }

  // write
H
Hongze Cheng 已提交
289
  code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
H
Hongze Cheng 已提交
290
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
291 292

  // put delIdx
293
  if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
H
Hongze Cheng 已提交
294
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
295
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
296
  }
H
Hongze Cheng 已提交
297 298

_exit:
H
Hongze Cheng 已提交
299
  if (code) {
H
Hongze Cheng 已提交
300
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
301
              tstrerror(code));
H
Hongze Cheng 已提交
302
  }
H
Hongze Cheng 已提交
303 304 305
  return code;
}

H
Hongze Cheng 已提交
306 307
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
308
  int32_t lino = 0;
H
Hongze Cheng 已提交
309
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
310

H
Hongze Cheng 已提交
311
  code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
H
Hongze Cheng 已提交
312
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
313

H
Hongze Cheng 已提交
314
  code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
H
Hongze Cheng 已提交
315
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
316

H
Hongze Cheng 已提交
317
  code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
H
Hongze Cheng 已提交
318
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320
  code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
H
Hongze Cheng 已提交
321
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
322 323

  if (pCommitter->pDelFReader) {
H
Hongze Cheng 已提交
324
    code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
H
Hongze Cheng 已提交
325
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
326 327
  }

H
Hongze Cheng 已提交
328 329 330 331
  taosArrayDestroy(pCommitter->aDelIdx);
  taosArrayDestroy(pCommitter->aDelData);
  taosArrayDestroy(pCommitter->aDelIdxN);

H
Hongze Cheng 已提交
332 333
_exit:
  if (code) {
H
Hongze Cheng 已提交
334
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
335
              tstrerror(code));
H
Hongze Cheng 已提交
336
  }
H
Hongze Cheng 已提交
337 338 339
  return code;
}

H
Hongze Cheng 已提交
340
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
H
Hongze Cheng 已提交
341
  int32_t code = 0;
H
Hongze Cheng 已提交
342
  int32_t lino = 0;
H
Hongze Cheng 已提交
343

H
Hongze Cheng 已提交
344
  if (suid) {
H
Hongze Cheng 已提交
345 346
    if (pSkmInfo->suid == suid) {
      pSkmInfo->uid = uid;
H
Hongze Cheng 已提交
347 348
      goto _exit;
    }
H
Hongze Cheng 已提交
349
  } else {
H
Hongze Cheng 已提交
350
    if (pSkmInfo->uid == uid) goto _exit;
H
Hongze Cheng 已提交
351 352
  }

H
Hongze Cheng 已提交
353 354
  pSkmInfo->suid = suid;
  pSkmInfo->uid = uid;
355
  tDestroyTSchema(pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
356
  code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
H
Hongze Cheng 已提交
357
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
358 359 360 361 362 363 364

_exit:
  return code;
}

static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
  int32_t code = 0;
H
Hongze Cheng 已提交
365
  int32_t lino = 0;
H
Hongze Cheng 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378

  if (pCommitter->skmRow.pTSchema) {
    if (pCommitter->skmRow.suid == suid) {
      if (suid == 0) {
        if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      } else {
        if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
      }
    }
  }

  pCommitter->skmRow.suid = suid;
  pCommitter->skmRow.uid = uid;
379
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
380
  code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
381
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
382 383 384 385 386

_exit:
  return code;
}

H
Hongze Cheng 已提交
387 388
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
389
  int32_t lino = 0;
H
Hongze Cheng 已提交
390 391 392 393 394 395 396 397

  ASSERT(pCommitter->dReader.pBlockIdx);

  pCommitter->dReader.iBlockIdx++;
  if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
    pCommitter->dReader.pBlockIdx =
        (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);

H
Hongze Cheng 已提交
398
    code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
399
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
400 401 402 403 404 405 406 407 408 409

    ASSERT(pCommitter->dReader.mBlock.nItem > 0);
  } else {
    pCommitter->dReader.pBlockIdx = NULL;
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
410 411 412 413 414 415 416
static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n));
  SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n));

  return tRowInfoCmprFn(&pIter1->r, &pIter2->r);
}

H
Hongze Cheng 已提交
417 418
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
419
  int32_t lino = 0;
H
Hongze Cheng 已提交
420 421

  pCommitter->pIter = NULL;
H
Hongze Cheng 已提交
422
  tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn);
H
Hongze Cheng 已提交
423 424

  // memory
H
Hongze Cheng 已提交
425
  TSDBKEY    tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
H
Hongze Cheng 已提交
426 427
  SDataIter *pIter = &pCommitter->dataIter;
  pIter->type = MEMORY_DATA_ITER;
H
Hongze Cheng 已提交
428 429 430 431 432 433 434
  pIter->iTbDataP = 0;
  for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
    STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
    tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
    TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
    if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
      pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
H
Hongze Cheng 已提交
435
      pRow = NULL;
H
Hongze Cheng 已提交
436 437
    }

H
Hongze Cheng 已提交
438 439
    if (pRow == NULL) continue;

H
Hongze Cheng 已提交
440 441 442 443 444
    pIter->r.suid = pTbData->suid;
    pIter->r.uid = pTbData->uid;
    pIter->r.row = *pRow;
    break;
  }
H
Hongze Cheng 已提交
445
  ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
H
Hongze Cheng 已提交
446 447 448
  tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);

  // disk
H
Hongze Cheng 已提交
449
  pCommitter->toLastOnly = 0;
H
Hongze Cheng 已提交
450
  SDataFReader *pReader = pCommitter->dReader.pReader;
H
Hongze Cheng 已提交
451
  if (pReader) {
H
Hongze Cheng 已提交
452
    if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
453
      int8_t iIter = 0;
H
Hongze Cheng 已提交
454
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
H
Hongze Cheng 已提交
455
        pIter = &pCommitter->aDataIter[iIter];
H
Hongze Cheng 已提交
456
        pIter->type = STT_DATA_ITER;
H
Hongze Cheng 已提交
457
        pIter->iStt = iStt;
H
Hongze Cheng 已提交
458

H
Hongze Cheng 已提交
459
        code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
H
Hongze Cheng 已提交
460
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
461

H
Hongze Cheng 已提交
462
        if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
H
Hongze Cheng 已提交
463

H
Hongze Cheng 已提交
464 465
        pIter->iSttBlk = 0;
        SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
H
Hongze Cheng 已提交
466
        code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
467
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
468 469 470 471 472 473 474 475 476

        pIter->iRow = 0;
        pIter->r.suid = pIter->bData.suid;
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);

        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
        iIter++;
      }
H
Hongze Cheng 已提交
477
    } else {
H
Hongze Cheng 已提交
478 479 480
      for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
        SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
        if (pSttFile->size > pSttFile->offset) {
H
Hongze Cheng 已提交
481 482 483 484
          pCommitter->toLastOnly = 1;
          break;
        }
      }
H
Hongze Cheng 已提交
485
    }
H
Hongze Cheng 已提交
486 487 488
  }

  code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
489
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
490

H
Hongze Cheng 已提交
491 492
_exit:
  if (code) {
S
Shengliang Guan 已提交
493
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
494
              tstrerror(code));
H
Hongze Cheng 已提交
495
  }
H
Hongze Cheng 已提交
496 497 498
  return code;
}

H
Hongze Cheng 已提交
499 500
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
501
  int32_t    lino = 0;
H
Hongze Cheng 已提交
502 503
  STsdb     *pTsdb = pCommitter->pTsdb;
  SDFileSet *pRSet = NULL;
H
Hongze Cheng 已提交
504

H
Hongze Cheng 已提交
505
  // memory
H
Hongze Cheng 已提交
506
  pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
H
Hongze Cheng 已提交
507
  pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
H
Hongze Cheng 已提交
508 509
  tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
                  &pCommitter->maxKey);
510 511 512
#if 0
  ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif
513

H
Hongze Cheng 已提交
514
  pCommitter->nextKey = TSKEY_MAX;
H
Hongze Cheng 已提交
515

H
Hongze Cheng 已提交
516
  // Reader
H
Hongze Cheng 已提交
517 518
  SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
  pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
H
Hongze Cheng 已提交
519
  if (pRSet) {
H
Hongze Cheng 已提交
520
    code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
H
Hongze Cheng 已提交
521
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523
    // data
H
Hongze Cheng 已提交
524
    code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
H
Hongze Cheng 已提交
525
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
526

H
Hongze Cheng 已提交
527
    pCommitter->dReader.iBlockIdx = 0;
H
Hongze Cheng 已提交
528 529
    if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
      pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
H
Hongze Cheng 已提交
530
      code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
531
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
532 533 534
    } else {
      pCommitter->dReader.pBlockIdx = NULL;
    }
H
Hongze Cheng 已提交
535
    tBlockDataReset(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
536
  } else {
H
Hongze Cheng 已提交
537
    pCommitter->dReader.pBlockIdx = NULL;
H
Hongze Cheng 已提交
538
  }
H
Hongze Cheng 已提交
539

H
Hongze Cheng 已提交
540
  // Writer
H
Hongze Cheng 已提交
541 542 543
  SHeadFile fHead = {.commitID = pCommitter->commitID};
  SDataFile fData = {.commitID = pCommitter->commitID};
  SSmaFile  fSma = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
544
  SSttFile  fStt = {.commitID = pCommitter->commitID};
H
Hongze Cheng 已提交
545
  SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
H
Hongze Cheng 已提交
546
  if (pRSet) {
H
Hongze Cheng 已提交
547
    ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
H
Hongze Cheng 已提交
548 549
    fData = *pRSet->pDataF;
    fSma = *pRSet->pSmaF;
H
Hongze Cheng 已提交
550
    wSet.diskId = pRSet->diskId;
H
Hongze Cheng 已提交
551
    if (pRSet->nSttF < pCommitter->sttTrigger) {
H
Hongze Cheng 已提交
552 553
      for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
        wSet.aSttF[iStt] = pRSet->aSttF[iStt];
H
Hongze Cheng 已提交
554
      }
H
Hongze Cheng 已提交
555
      wSet.nSttF = pRSet->nSttF + 1;
H
Hongze Cheng 已提交
556
    } else {
H
Hongze Cheng 已提交
557
      wSet.nSttF = 1;
H
Hongze Cheng 已提交
558
    }
H
Hongze Cheng 已提交
559
  } else {
H
Hongze Cheng 已提交
560
    SDiskID did = {0};
H
Hongze Cheng 已提交
561 562 563 564
    if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
      code = terrno;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
565
    tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
566
    wSet.diskId = did;
H
Hongze Cheng 已提交
567
    wSet.nSttF = 1;
H
Hongze Cheng 已提交
568
  }
H
Hongze Cheng 已提交
569
  wSet.aSttF[wSet.nSttF - 1] = &fStt;
H
Hongze Cheng 已提交
570
  code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
571
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
572

H
Hongze Cheng 已提交
573
  taosArrayClear(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
574
  taosArrayClear(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
575 576
  tMapDataReset(&pCommitter->dWriter.mBlock);
  tBlockDataReset(&pCommitter->dWriter.bData);
H
compare  
Hongze Cheng 已提交
577
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
578
  tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
579 580 581
#else
  tBlockDataReset(&pCommitter->dWriter.bDatal);
#endif
H
Hongze Cheng 已提交
582

H
Hongze Cheng 已提交
583 584
  // open iter
  code = tsdbOpenCommitIter(pCommitter);
H
Hongze Cheng 已提交
585
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
586

H
Hongze Cheng 已提交
587
_exit:
H
Hongze Cheng 已提交
588
  if (code) {
S
Shengliang Guan 已提交
589
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
590
  }
H
Hongze Cheng 已提交
591
  return code;
H
Hongze Cheng 已提交
592 593
}

H
Hongze Cheng 已提交
594 595
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
596
  int32_t lino = 0;
H
Hongze Cheng 已提交
597

H
Hongze Cheng 已提交
598
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
599

H
Hongze Cheng 已提交
600
  SDataBlk dataBlk;
H
Hongze Cheng 已提交
601
  tDataBlkReset(&dataBlk);
H
Hongze Cheng 已提交
602

H
Hongze Cheng 已提交
603
  // info
H
Hongze Cheng 已提交
604
  dataBlk.nRow += pBlockData->nRow;
H
Hongze Cheng 已提交
605 606
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
    TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
H
Hongze Cheng 已提交
607

H
Hongze Cheng 已提交
608
    if (iRow == 0) {
H
Hongze Cheng 已提交
609 610
      if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
        dataBlk.minKey = key;
H
Hongze Cheng 已提交
611 612 613
      }
    } else {
      if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
H
Hongze Cheng 已提交
614
        dataBlk.hasDup = 1;
H
Hongze Cheng 已提交
615 616 617
      }
    }

H
Hongze Cheng 已提交
618 619
    if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
      dataBlk.maxKey = key;
H
Hongze Cheng 已提交
620 621
    }

H
Hongze Cheng 已提交
622 623
    dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
    dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
H
Hongze Cheng 已提交
624 625 626
  }

  // write
H
Hongze Cheng 已提交
627
  dataBlk.nSubBlock++;
H
Hongze Cheng 已提交
628 629
  code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
                            ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
H
Hongze Cheng 已提交
630
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
631

H
Hongze Cheng 已提交
632
  // put SDataBlk
H
Hongze Cheng 已提交
633
  code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
634
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
635

H
Hongze Cheng 已提交
636
  // clear
H
Hongze Cheng 已提交
637
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
638

H
Hongze Cheng 已提交
639 640
_exit:
  if (code) {
641 642
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
643
  }
H
Hongze Cheng 已提交
644 645 646
  return code;
}

H
Hongze Cheng 已提交
647 648
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
  int32_t code = 0;
H
Hongze Cheng 已提交
649
  int32_t lino = 0;
H
Hongze Cheng 已提交
650
  SSttBlk sstBlk;
H
Hongze Cheng 已提交
651

H
Hongze Cheng 已提交
652
  if (pBlockData->nRow == 0) return code;
H
Hongze Cheng 已提交
653

H
Hongze Cheng 已提交
654
  // info
H
Hongze Cheng 已提交
655 656 657 658 659 660
  sstBlk.suid = pBlockData->suid;
  sstBlk.nRow = pBlockData->nRow;
  sstBlk.minKey = TSKEY_MAX;
  sstBlk.maxKey = TSKEY_MIN;
  sstBlk.minVer = VERSION_MAX;
  sstBlk.maxVer = VERSION_MIN;
H
Hongze Cheng 已提交
661
  for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
H
Hongze Cheng 已提交
662 663 664 665
    sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
    sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
    sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
    sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
H
Hongze Cheng 已提交
666
  }
H
Hongze Cheng 已提交
667 668
  sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
  sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
H
Hongze Cheng 已提交
669

H
Hongze Cheng 已提交
670
  // write
H
Hongze Cheng 已提交
671
  code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
H
Hongze Cheng 已提交
672
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
673

H
Hongze Cheng 已提交
674
  // push SSttBlk
H
Hongze Cheng 已提交
675
  if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
H
Hongze Cheng 已提交
676
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
677
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
678 679
  }

H
Hongze Cheng 已提交
680
  // clear
H
Hongze Cheng 已提交
681
  tBlockDataClear(pBlockData);
H
Hongze Cheng 已提交
682

H
Hongze Cheng 已提交
683 684
_exit:
  if (code) {
685 686
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
687
  }
H
Hongze Cheng 已提交
688 689 690
  return code;
}

H
Hongze Cheng 已提交
691 692 693 694 695 696 697
static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pBuilder->nRow == 0) return code;

  // gnrt
H
Hongze Cheng 已提交
698 699 700
  const SDiskData *pDiskData;
  const SBlkInfo  *pBlkInfo;
  code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo);
H
Hongze Cheng 已提交
701 702
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
703 704 705 706 707 708
  SSttBlk sttBlk = {.suid = pBuilder->suid,
                    .minUid = pBlkInfo->minUid,
                    .maxUid = pBlkInfo->maxUid,
                    .minKey = pBlkInfo->minKey,
                    .maxKey = pBlkInfo->maxKey,
                    .minVer = pBlkInfo->minVer,
H
Hongze Cheng 已提交
709 710
                    .maxVer = pBlkInfo->maxVer,
                    .nRow = pBuilder->nRow};
H
Hongze Cheng 已提交
711
  // write
H
Hongze Cheng 已提交
712
  code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL);
H
Hongze Cheng 已提交
713 714 715 716 717 718 719 720
  TSDB_CHECK_CODE(code, lino, _exit);

  // push
  if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

H
Hongze Cheng 已提交
721 722
  // clear
  tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
723 724 725

_exit:
  if (code) {
726 727
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
H
Hongze Cheng 已提交
728 729 730 731
  }
  return code;
}

H
Hongze Cheng 已提交
732 733
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
734
  int32_t lino = 0;
H
Hongze Cheng 已提交
735

H
Hongze Cheng 已提交
736
  // write aBlockIdx
H
Hongze Cheng 已提交
737
  code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
738
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
739

H
Hongze Cheng 已提交
740 741
  // write aSttBlk
  code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
742
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
743

H
Hongze Cheng 已提交
744
  // update file header
H
Hongze Cheng 已提交
745
  code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
H
Hongze Cheng 已提交
746
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
747 748

  // upsert SDFileSet
H
Hongze Cheng 已提交
749
  code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
H
Hongze Cheng 已提交
750
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
751 752

  // close and sync
H
Hongze Cheng 已提交
753
  code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
H
Hongze Cheng 已提交
754
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
755

H
Hongze Cheng 已提交
756 757
  if (pCommitter->dReader.pReader) {
    code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
H
Hongze Cheng 已提交
758
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
759 760 761
  }

_exit:
H
Hongze Cheng 已提交
762
  if (code) {
S
Shengliang Guan 已提交
763
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
764
              tstrerror(code));
H
Hongze Cheng 已提交
765
  }
H
Hongze Cheng 已提交
766 767 768
  return code;
}

H
Hongze Cheng 已提交
769 770
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
  int32_t code = 0;
H
Hongze Cheng 已提交
771
  int32_t lino = 0;
H
Hongze Cheng 已提交
772

H
Hongze Cheng 已提交
773
  while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
H
Hongze Cheng 已提交
774
    SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
H
Hongze Cheng 已提交
775
    code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
H
Hongze Cheng 已提交
776
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
777 778 779

    if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
780
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
781 782
    }

H
Hongze Cheng 已提交
783
    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
784
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
785 786
  }

H
Hongze Cheng 已提交
787 788
_exit:
  if (code) {
S
Shengliang Guan 已提交
789
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
790
              tstrerror(code));
H
Hongze Cheng 已提交
791
  }
H
Hongze Cheng 已提交
792 793 794
  return code;
}

H
Hongze Cheng 已提交
795
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
H
Hongze Cheng 已提交
796
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
797
  int32_t    code = 0;
H
Hongze Cheng 已提交
798
  int32_t    lino = 0;
H
Hongze Cheng 已提交
799 800
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
801 802 803

  // commit file data start
  code = tsdbCommitFileDataStart(pCommitter);
H
Hongze Cheng 已提交
804
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
805

H
Hongze Cheng 已提交
806 807
  // impl
  code = tsdbCommitFileDataImpl(pCommitter);
H
Hongze Cheng 已提交
808
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
809

H
Hongze Cheng 已提交
810 811
  // commit file data end
  code = tsdbCommitFileDataEnd(pCommitter);
H
Hongze Cheng 已提交
812
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
813

H
Hongze Cheng 已提交
814 815
_exit:
  if (code) {
S
Shengliang Guan 已提交
816
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
817 818 819
    tsdbDataFReaderClose(&pCommitter->dReader.pReader);
    tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
  }
H
Hongze Cheng 已提交
820 821 822
  return code;
}

H
Hongze Cheng 已提交
823
// ----------------------------------------------------------------------------
H
Hongze Cheng 已提交
824
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) {
H
Hongze Cheng 已提交
825
  int32_t code = 0;
H
Hongze Cheng 已提交
826
  int32_t lino = 0;
H
Hongze Cheng 已提交
827

H
Hongze Cheng 已提交
828
  memset(pCommitter, 0, sizeof(*pCommitter));
H
Hongze Cheng 已提交
829
  ASSERT(pTsdb->imem && "last tsdb commit incomplete");
H
Hongze Cheng 已提交
830

H
Hongze Cheng 已提交
831
  pCommitter->pTsdb = pTsdb;
H
Hongze Cheng 已提交
832
  pCommitter->commitID = pInfo->info.state.commitID;
H
Hongze Cheng 已提交
833 834
  pCommitter->minutes = pTsdb->keepCfg.days;
  pCommitter->precision = pTsdb->keepCfg.precision;
H
Hongze Cheng 已提交
835 836 837 838
  pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
  pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
  pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
  pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
H
Hongze Cheng 已提交
839 840 841
  pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
  if (pCommitter->aTbDataP == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
842
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
843
  }
H
Hongze Cheng 已提交
844
  code = tsdbFSCopy(pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
845
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
846

H
Hongze Cheng 已提交
847 848
_exit:
  if (code) {
S
Shengliang Guan 已提交
849
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
850
  }
H
Hongze Cheng 已提交
851 852 853
  return code;
}

H
Hongze Cheng 已提交
854 855
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
856
  int32_t lino = 0;
H
Hongze Cheng 已提交
857

H
Hongze Cheng 已提交
858
  // reader
H
Hongze Cheng 已提交
859 860
  pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dReader.aBlockIdx == NULL) {
H
Hongze Cheng 已提交
861
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
862
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
863 864
  }

H
Hongze Cheng 已提交
865
  code = tBlockDataCreate(&pCommitter->dReader.bData);
H
Hongze Cheng 已提交
866
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
867

H
Hongze Cheng 已提交
868
  // merger
H
Hongze Cheng 已提交
869
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
870 871 872
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pIter->aSttBlk == NULL) {
H
Hongze Cheng 已提交
873
      code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
874
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
875 876 877
    }

    code = tBlockDataCreate(&pIter->bData);
H
Hongze Cheng 已提交
878
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
879 880 881
  }

  // writer
H
Hongze Cheng 已提交
882 883 884
  pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pCommitter->dWriter.aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
885
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
886 887
  }

H
Hongze Cheng 已提交
888 889
  pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pCommitter->dWriter.aSttBlk == NULL) {
H
Hongze Cheng 已提交
890
    code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
891
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
892 893
  }

H
Hongze Cheng 已提交
894
  code = tBlockDataCreate(&pCommitter->dWriter.bData);
H
Hongze Cheng 已提交
895
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
896

H
compare  
Hongze Cheng 已提交
897
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
898
  code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
899 900 901
#else
  code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
#endif
H
Hongze Cheng 已提交
902
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
903

H
Hongze Cheng 已提交
904
_exit:
H
Hongze Cheng 已提交
905
  if (code) {
S
Shengliang Guan 已提交
906
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
907
              tstrerror(code));
H
Hongze Cheng 已提交
908
  }
H
Hongze Cheng 已提交
909 910 911 912
  return code;
}

static void tsdbCommitDataEnd(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
913
  // reader
H
Hongze Cheng 已提交
914 915
  taosArrayDestroy(pCommitter->dReader.aBlockIdx);
  tMapDataClear(&pCommitter->dReader.mBlock);
H
Hongze Cheng 已提交
916
  tBlockDataDestroy(&pCommitter->dReader.bData, 1);
H
Hongze Cheng 已提交
917

H
Hongze Cheng 已提交
918
  // merger
H
Hongze Cheng 已提交
919
  for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
H
Hongze Cheng 已提交
920 921
    SDataIter *pIter = &pCommitter->aDataIter[iStt];
    taosArrayDestroy(pIter->aSttBlk);
H
Hongze Cheng 已提交
922 923 924 925
    tBlockDataDestroy(&pIter->bData, 1);
  }

  // writer
H
Hongze Cheng 已提交
926
  taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
H
Hongze Cheng 已提交
927
  taosArrayDestroy(pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
928
  tMapDataClear(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
929
  tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
H
compare  
Hongze Cheng 已提交
930
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
931
  tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
H
compare  
Hongze Cheng 已提交
932 933 934
#else
  tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
#endif
935 936
  tDestroyTSchema(pCommitter->skmTable.pTSchema);
  tDestroyTSchema(pCommitter->skmRow.pTSchema);
H
Hongze Cheng 已提交
937 938
}

H
Hongze Cheng 已提交
939
static int32_t tsdbCommitData(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
940 941 942
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
943 944
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
945

H
Hongze Cheng 已提交
946
  // check
H
Hongze Cheng 已提交
947
  if (pMemTable->nRow == 0) goto _exit;
H
Hongze Cheng 已提交
948

H
Hongze Cheng 已提交
949 950
  // start ====================
  code = tsdbCommitDataStart(pCommitter);
H
Hongze Cheng 已提交
951
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
952 953 954

  // impl ====================
  pCommitter->nextKey = pMemTable->minKey;
H
Hongze Cheng 已提交
955 956
  while (pCommitter->nextKey < TSKEY_MAX) {
    code = tsdbCommitFileData(pCommitter);
H
Hongze Cheng 已提交
957
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
958
  }
H
Hongze Cheng 已提交
959

H
Hongze Cheng 已提交
960 961 962
  // end ====================
  tsdbCommitDataEnd(pCommitter);

H
Hongze Cheng 已提交
963
_exit:
H
Hongze Cheng 已提交
964
  if (code) {
S
Shengliang Guan 已提交
965
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
966
  }
H
Hongze Cheng 已提交
967 968
  return code;
}
H
Hongze Cheng 已提交
969

H
Hongze Cheng 已提交
970
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
971 972 973
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
974 975
  STsdb     *pTsdb = pCommitter->pTsdb;
  SMemTable *pMemTable = pTsdb->imem;
H
Hongze Cheng 已提交
976

H
Hongze Cheng 已提交
977 978
  if (pMemTable->nDel == 0) {
    goto _exit;
H
Hongze Cheng 已提交
979
  }
H
Hongze Cheng 已提交
980

H
Hongze Cheng 已提交
981 982 983
  // start
  code = tsdbCommitDelStart(pCommitter);
  if (code) {
H
Hongze Cheng 已提交
984
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
985
  }
H
Hongze Cheng 已提交
986

H
Hongze Cheng 已提交
987
  // impl
H
Hongze Cheng 已提交
988 989 990
  int32_t  iDelIdx = 0;
  int32_t  nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
  int32_t  iTbData = 0;
H
Hongze Cheng 已提交
991
  int32_t  nTbData = taosArrayGetSize(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
992 993 994 995 996
  STbData *pTbData;
  SDelIdx *pDelIdx;

  ASSERT(nTbData > 0);

H
Hongze Cheng 已提交
997
  pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
H
Hongze Cheng 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
  pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
  while (true) {
    if (pTbData == NULL && pDelIdx == NULL) break;

    if (pTbData && pDelIdx) {
      int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);

      if (c == 0) {
        goto _commit_mem_and_disk_del;
      } else if (c < 0) {
        goto _commit_mem_del;
      } else {
        goto _commit_disk_del;
      }
    } else if (pTbData) {
      goto _commit_mem_del;
    } else {
      goto _commit_disk_del;
    }

  _commit_mem_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
H
Hongze Cheng 已提交
1020
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1021 1022

    iTbData++;
H
Hongze Cheng 已提交
1023
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1024 1025 1026 1027
    continue;

  _commit_disk_del:
    code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
H
Hongze Cheng 已提交
1028
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1029 1030 1031 1032 1033 1034 1035

    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;

  _commit_mem_and_disk_del:
    code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
H
Hongze Cheng 已提交
1036
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1037 1038

    iTbData++;
H
Hongze Cheng 已提交
1039
    pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
H
Hongze Cheng 已提交
1040 1041 1042
    iDelIdx++;
    pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
    continue;
H
Hongze Cheng 已提交
1043
  }
H
Hongze Cheng 已提交
1044

H
Hongze Cheng 已提交
1045 1046
  // end
  code = tsdbCommitDelEnd(pCommitter);
H
Hongze Cheng 已提交
1047
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1048

H
Hongze Cheng 已提交
1049
_exit:
H
Hongze Cheng 已提交
1050
  if (code) {
S
Shengliang Guan 已提交
1051
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1052
  } else {
S
Shengliang Guan 已提交
1053
    tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
H
Hongze Cheng 已提交
1054
  }
H
Hongze Cheng 已提交
1055
  return code;
H
Hongze Cheng 已提交
1056 1057
}

H
Hongze Cheng 已提交
1058
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
H
Hongze Cheng 已提交
1059 1060
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
1061
  STsdb  *pTsdb = pCommitter->pTsdb;
H
Hongze Cheng 已提交
1062

H
Hongze Cheng 已提交
1063 1064 1065 1066 1067
  if (eno) {
    code = eno;
    TSDB_CHECK_CODE(code, lino, _exit);
  } else {
    code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
H
Hongze Cheng 已提交
1068
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1069 1070
  }

H
Hongze Cheng 已提交
1071
_exit:
H
Hongze Cheng 已提交
1072
  tsdbFSDestroy(&pCommitter->fs);
H
Hongze Cheng 已提交
1073
  taosArrayDestroy(pCommitter->aTbDataP);
H
Hongze Cheng 已提交
1074
  pCommitter->aTbDataP = NULL;
H
Hongze Cheng 已提交
1075
  if (code || eno) {
S
Shengliang Guan 已提交
1076
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1077
  } else {
S
Shengliang Guan 已提交
1078
    tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1079
  }
H
Hongze Cheng 已提交
1080 1081
  return code;
}
H
Hongze Cheng 已提交
1082

H
Hongze Cheng 已提交
1083
// ================================================================================
H
Hongze Cheng 已提交
1084

H
Hongze Cheng 已提交
1085 1086
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
  return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
H
Hongze Cheng 已提交
1087 1088
}

H
Hongze Cheng 已提交
1089
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
H
Hongze Cheng 已提交
1090
  int32_t code = 0;
H
Hongze Cheng 已提交
1091
  int32_t lino = 0;
H
Hongze Cheng 已提交
1092 1093 1094

  if (pCommitter->pIter) {
    SDataIter *pIter = pCommitter->pIter;
H
Hongze Cheng 已提交
1095
    if (pCommitter->pIter->type == MEMORY_DATA_ITER) {  // memory
H
Hongze Cheng 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
      tsdbTbDataIterNext(&pIter->iter);
      TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
      while (true) {
        if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
          pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
          pRow = NULL;
        }

        if (pRow) {
          pIter->r.suid = pIter->iter.pTbData->suid;
          pIter->r.uid = pIter->iter.pTbData->uid;
          pIter->r.row = *pRow;
          break;
        }

        pIter->iTbDataP++;
H
Hongze Cheng 已提交
1112 1113
        if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
          STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
H
Hongze Cheng 已提交
1114 1115 1116 1117 1118 1119 1120 1121 1122
          TSDBKEY  keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
          tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
          pRow = tsdbTbDataIterGet(&pIter->iter);
          continue;
        } else {
          pCommitter->pIter = NULL;
          break;
        }
      }
H
Hongze Cheng 已提交
1123
    } else if (pCommitter->pIter->type == STT_DATA_ITER) {  // last file
H
Hongze Cheng 已提交
1124 1125 1126 1127 1128
      pIter->iRow++;
      if (pIter->iRow < pIter->bData.nRow) {
        pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
        pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
      } else {
H
Hongze Cheng 已提交
1129 1130 1131
        pIter->iSttBlk++;
        if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
          SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
H
Hongze Cheng 已提交
1132

H
Hongze Cheng 已提交
1133
          code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
H
Hongze Cheng 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
          if (code) goto _exit;

          pIter->iRow = 0;
          pIter->r.suid = pIter->bData.suid;
          pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
          pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
        } else {
          pCommitter->pIter = NULL;
        }
      }
    } else {
      ASSERT(0);
    }

    // compare with min in RB Tree
    pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter && pIter) {
      int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
      if (c > 0) {
        tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
        pCommitter->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pCommitter->pIter == NULL) {
    pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
    if (pCommitter->pIter) {
      tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
    }
  }

_exit:
H
Hongze Cheng 已提交
1169
  if (code) {
S
Shengliang Guan 已提交
1170
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1171
              tstrerror(code));
H
Hongze Cheng 已提交
1172
  }
H
Hongze Cheng 已提交
1173 1174 1175
  return code;
}

H
Hongze Cheng 已提交
1176
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1177 1178 1179
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1180 1181 1182 1183 1184 1185 1186 1187
  SBlockData *pBlockData = &pCommitter->dWriter.bData;
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};

  tBlockDataClear(pBlockData);
  while (pRowInfo) {
    ASSERT(pRowInfo->row.type == 0);
    code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1188
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1189 1190

    code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1191
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1192 1193

    code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1194
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1195 1196 1197 1198 1199 1200 1201

    pRowInfo = tsdbGetCommitRow(pCommitter);
    if (pRowInfo) {
      if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
        pRowInfo = NULL;
      } else {
        TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1202
        if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1203 1204 1205
      }
    }

H
Hongze Cheng 已提交
1206
    if (pBlockData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1207 1208
      code =
          tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1209
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1210 1211 1212
    }
  }

H
Hongze Cheng 已提交
1213
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1214
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1215

H
Hongze Cheng 已提交
1216 1217
_exit:
  if (code) {
S
Shengliang Guan 已提交
1218
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1219
              tstrerror(code));
H
Hongze Cheng 已提交
1220
  }
H
Hongze Cheng 已提交
1221 1222 1223
  return code;
}

H
Hongze Cheng 已提交
1224
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
H
Hongze Cheng 已提交
1225 1226 1227
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1228 1229 1230 1231 1232
  SRowInfo   *pRowInfo = tsdbGetCommitRow(pCommitter);
  TABLEID     id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
  SBlockData *pBDataR = &pCommitter->dReader.bData;
  SBlockData *pBDataW = &pCommitter->dWriter.bData;

H
Hongze Cheng 已提交
1233
  code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
H
Hongze Cheng 已提交
1234
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

  tBlockDataClear(pBDataW);
  int32_t  iRow = 0;
  TSDBROW  row = tsdbRowFromBlockData(pBDataR, 0);
  TSDBROW *pRow = &row;

  while (pRow && pRowInfo) {
    int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
    if (c < 0) {
      code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1245
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255

      iRow++;
      if (iRow < pBDataR->nRow) {
        row = tsdbRowFromBlockData(pBDataR, iRow);
      } else {
        pRow = NULL;
      }
    } else if (c > 0) {
      ASSERT(pRowInfo->row.type == 0);
      code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1256
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1257 1258

      code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
H
Hongze Cheng 已提交
1259
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1260 1261

      code = tsdbNextCommitRow(pCommitter);
H
Hongze Cheng 已提交
1262
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1263 1264 1265 1266 1267 1268 1269

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo) {
        if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
          pRowInfo = NULL;
        } else {
          TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
H
Hongze Cheng 已提交
1270
          if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
H
Hongze Cheng 已提交
1271 1272 1273
        }
      }
    } else {
1274
      ASSERT(0 && "dup rows not allowed");
H
Hongze Cheng 已提交
1275 1276
    }

H
Hongze Cheng 已提交
1277
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1278
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1279
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1280 1281 1282 1283 1284
    }
  }

  while (pRow) {
    code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
H
Hongze Cheng 已提交
1285
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1286 1287 1288 1289 1290 1291 1292 1293

    iRow++;
    if (iRow < pBDataR->nRow) {
      row = tsdbRowFromBlockData(pBDataR, iRow);
    } else {
      pRow = NULL;
    }

H
Hongze Cheng 已提交
1294
    if (pBDataW->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1295
      code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1296
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1297 1298 1299
    }
  }

H
Hongze Cheng 已提交
1300
  code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1301
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1302

H
Hongze Cheng 已提交
1303 1304
_exit:
  if (code) {
S
Shengliang Guan 已提交
1305
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1306
              tstrerror(code));
H
Hongze Cheng 已提交
1307
  }
H
Hongze Cheng 已提交
1308 1309 1310
  return code;
}

H
Hongze Cheng 已提交
1311
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1312 1313 1314
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
1315 1316 1317 1318 1319
  SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;

  ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
  if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
    int32_t   iBlock = 0;
H
Hongze Cheng 已提交
1320 1321
    SDataBlk  block;
    SDataBlk *pDataBlk = &block;
H
Hongze Cheng 已提交
1322 1323 1324 1325
    SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);

    ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);

H
Hongze Cheng 已提交
1326 1327 1328
    tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
    while (pDataBlk && pRowInfo) {
      SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
H
Hongze Cheng 已提交
1329
      int32_t  c = tDataBlkCmprFn(pDataBlk, &tBlock);
H
Hongze Cheng 已提交
1330 1331

      if (c < 0) {
H
Hongze Cheng 已提交
1332
        code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1333
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1334 1335 1336

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1337
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1338
        } else {
H
Hongze Cheng 已提交
1339
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1340 1341
        }
      } else if (c > 0) {
H
Hongze Cheng 已提交
1342
        code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1343
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1344 1345 1346

        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1347
      } else {
H
Hongze Cheng 已提交
1348
        code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
H
Hongze Cheng 已提交
1349
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1350 1351 1352

        iBlock++;
        if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1353
          tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1354
        } else {
H
Hongze Cheng 已提交
1355
          pDataBlk = NULL;
H
Hongze Cheng 已提交
1356 1357 1358
        }
        pRowInfo = tsdbGetCommitRow(pCommitter);
        if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
H
Hongze Cheng 已提交
1359 1360 1361
      }
    }

H
Hongze Cheng 已提交
1362 1363
    while (pDataBlk) {
      code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
H
Hongze Cheng 已提交
1364
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1365 1366 1367

      iBlock++;
      if (iBlock < pCommitter->dReader.mBlock.nItem) {
H
Hongze Cheng 已提交
1368
        tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
H
Hongze Cheng 已提交
1369
      } else {
H
Hongze Cheng 已提交
1370
        pDataBlk = NULL;
H
Hongze Cheng 已提交
1371 1372
      }
    }
H
Hongze Cheng 已提交
1373 1374

    code = tsdbCommitterNextTableData(pCommitter);
H
Hongze Cheng 已提交
1375
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1376 1377 1378
  }

_exit:
H
Hongze Cheng 已提交
1379
  if (code) {
S
Shengliang Guan 已提交
1380
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1381
              tstrerror(code));
H
Hongze Cheng 已提交
1382
  }
H
Hongze Cheng 已提交
1383 1384 1385
  return code;
}

H
Hongze Cheng 已提交
1386
static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1387
  int32_t code = 0;
H
Hongze Cheng 已提交
1388
  int32_t lino = 0;
H
Hongze Cheng 已提交
1389

H
compare  
Hongze Cheng 已提交
1390
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1391 1392 1393
  SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
  if (pBuilder->suid || pBuilder->uid) {
    if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
H
Hongze Cheng 已提交
1394
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1395
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1396 1397

      tDiskDataBuilderClear(pBuilder);
H
Hongze Cheng 已提交
1398 1399 1400
    }
  }

H
Hongze Cheng 已提交
1401
  if (!pBuilder->suid && !pBuilder->uid) {
H
Hongze Cheng 已提交
1402 1403
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
H
Hongze Cheng 已提交
1404
    code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
H
Hongze Cheng 已提交
1405
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1406
  }
H
compare  
Hongze Cheng 已提交
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
#else
  SBlockData *pBData = &pCommitter->dWriter.bDatal;
  if (pBData->suid || pBData->uid) {
    if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);

      tBlockDataReset(pBData);
    }
  }

  if (!pBData->suid && !pBData->uid) {
    ASSERT(pCommitter->skmTable.suid == id.suid);
    ASSERT(pCommitter->skmTable.uid == id.uid);
    TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
    code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
#endif
H
Hongze Cheng 已提交
1426

H
Hongze Cheng 已提交
1427
_exit:
H
Hongze Cheng 已提交
1428
  if (code) {
S
Shengliang Guan 已提交
1429
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1430
              tstrerror(code));
H
Hongze Cheng 已提交
1431
  }
H
Hongze Cheng 已提交
1432 1433 1434 1435 1436
  return code;
}

static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1437
  int32_t lino = 0;
H
Hongze Cheng 已提交
1438 1439

  SBlockData *pBData = &pCommitter->dWriter.bData;
H
Hongze Cheng 已提交
1440
  TABLEID     id = {.suid = pBData->suid, .uid = pBData->uid};
H
Hongze Cheng 已提交
1441

H
Hongze Cheng 已提交
1442
  code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
H
Hongze Cheng 已提交
1443
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1444

H
Hongze Cheng 已提交
1445 1446
  for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
    TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
H
Hongze Cheng 已提交
1447

H
compare  
Hongze Cheng 已提交
1448
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1449
    code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
H
Hongze Cheng 已提交
1450
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1451

H
Hongze Cheng 已提交
1452
    if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1453
      code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1454
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1455 1456 1457

      code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1458
    }
H
compare  
Hongze Cheng 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
#else
    code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
      code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                               pCommitter->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
#endif
H
Hongze Cheng 已提交
1469 1470
  }

H
Hongze Cheng 已提交
1471 1472
_exit:
  if (code) {
S
Shengliang Guan 已提交
1473
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1474
              tstrerror(code));
H
Hongze Cheng 已提交
1475
  }
H
Hongze Cheng 已提交
1476 1477 1478
  return code;
}

H
Hongze Cheng 已提交
1479
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
H
Hongze Cheng 已提交
1480
  int32_t code = 0;
H
Hongze Cheng 已提交
1481
  int32_t lino = 0;
H
Hongze Cheng 已提交
1482

H
Hongze Cheng 已提交
1483
  SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
H
Hongze Cheng 已提交
1484 1485 1486 1487
  if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
    pRowInfo = NULL;
  }

H
Hongze Cheng 已提交
1488
  if (pRowInfo == NULL) goto _exit;
H
Hongze Cheng 已提交
1489

H
Hongze Cheng 已提交
1490
  if (pCommitter->toLastOnly) {
H
Hongze Cheng 已提交
1491 1492
    code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1493

H
Hongze Cheng 已提交
1494 1495 1496 1497 1498 1499 1500 1501
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
      if (pRowInfo->row.type == 0) {
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
        TSDB_CHECK_CODE(code, lino, _exit);
        pTSchema = pCommitter->skmRow.pTSchema;
      }

H
compare  
Hongze Cheng 已提交
1502
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1503
      code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
H
compare  
Hongze Cheng 已提交
1504 1505 1506
#else
      code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid);
#endif
H
Hongze Cheng 已提交
1507
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1508

H
Hongze Cheng 已提交
1509 1510
      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1511

H
Hongze Cheng 已提交
1512 1513 1514 1515
      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }
H
Hongze Cheng 已提交
1516

H
compare  
Hongze Cheng 已提交
1517
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1518
      if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1519
        code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
Hongze Cheng 已提交
1520
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1521 1522 1523

        code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1524
      }
H
compare  
Hongze Cheng 已提交
1525
#else
1526
      if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
H
compare  
Hongze Cheng 已提交
1527 1528 1529 1530 1531
        code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                                 pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
#endif
H
Hongze Cheng 已提交
1532
    }
H
Hongze Cheng 已提交
1533 1534 1535
  } else {
    SBlockData *pBData = &pCommitter->dWriter.bData;
    ASSERT(pBData->nRow == 0);
H
Hongze Cheng 已提交
1536

H
Hongze Cheng 已提交
1537 1538 1539 1540
    while (pRowInfo) {
      STSchema *pTSchema = NULL;
      if (pRowInfo->row.type == 0) {
        code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
H
Hongze Cheng 已提交
1541
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
        pTSchema = pCommitter->skmRow.pTSchema;
      }

      code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = tsdbNextCommitRow(pCommitter);
      TSDB_CHECK_CODE(code, lino, _exit);

      pRowInfo = tsdbGetCommitRow(pCommitter);
      if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
        pRowInfo = NULL;
      }

      if (pBData->nRow >= pCommitter->maxRow) {
H
Hongze Cheng 已提交
1557 1558
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
H
Hongze Cheng 已提交
1559
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1560
      }
H
Hongze Cheng 已提交
1561 1562
    }

H
Hongze Cheng 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571
    if (pBData->nRow) {
      if (pBData->nRow > pCommitter->minRow) {
        code =
            tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
        code = tsdbAppendLastBlock(pCommitter);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
H
Hongze Cheng 已提交
1572 1573 1574
    }
  }

H
Hongze Cheng 已提交
1575
_exit:
H
Hongze Cheng 已提交
1576
  if (code) {
S
Shengliang Guan 已提交
1577
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1578
              tstrerror(code));
H
Hongze Cheng 已提交
1579
  }
H
Hongze Cheng 已提交
1580 1581 1582
  return code;
}

H
Hongze Cheng 已提交
1583 1584
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
  int32_t code = 0;
H
Hongze Cheng 已提交
1585
  int32_t lino = 0;
H
Hongze Cheng 已提交
1586

H
Hongze Cheng 已提交
1587
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
1588
  TABLEID   id = {0};
H
Hongze Cheng 已提交
1589
  while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
H
Hongze Cheng 已提交
1590 1591 1592
    ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
    id.suid = pRowInfo->suid;
    id.uid = pRowInfo->uid;
H
Hongze Cheng 已提交
1593

H
Hongze Cheng 已提交
1594
    code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1595
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1596 1597

    // start
H
Hongze Cheng 已提交
1598
    tMapDataReset(&pCommitter->dWriter.mBlock);
H
Hongze Cheng 已提交
1599 1600

    // impl
H
Hongze Cheng 已提交
1601
    code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
H
Hongze Cheng 已提交
1602
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1603
    code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1604
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1605
    code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
1606
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1607

H
Hongze Cheng 已提交
1608 1609
    /* merge with data in .data file */
    code = tsdbMergeTableData(pCommitter, id);
H
Hongze Cheng 已提交
1610
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1611

H
Hongze Cheng 已提交
1612
    /* handle remain table data */
H
Hongze Cheng 已提交
1613
    code = tsdbCommitTableData(pCommitter, id);
H
Hongze Cheng 已提交
1614
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1615

H
Hongze Cheng 已提交
1616
    // end
H
Hongze Cheng 已提交
1617 1618
    if (pCommitter->dWriter.mBlock.nItem > 0) {
      SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
H
Hongze Cheng 已提交
1619
      code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
H
Hongze Cheng 已提交
1620
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1621 1622 1623

      if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
H
Hongze Cheng 已提交
1624
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1625 1626
      }
    }
H
Hongze Cheng 已提交
1627 1628
  }

H
Hongze Cheng 已提交
1629 1630 1631
  id.suid = INT64_MAX;
  id.uid = INT64_MAX;
  code = tsdbMoveCommitData(pCommitter, id);
H
Hongze Cheng 已提交
1632
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1633

H
compare  
Hongze Cheng 已提交
1634
#if USE_STREAM_COMPRESSION
H
Hongze Cheng 已提交
1635
  code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
H
compare  
Hongze Cheng 已提交
1636 1637 1638 1639
#else
  code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
                           pCommitter->cmprAlg);
#endif
H
Hongze Cheng 已提交
1640
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
1641

H
Hongze Cheng 已提交
1642 1643
_exit:
  if (code) {
S
Shengliang Guan 已提交
1644
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
H
Hongze Cheng 已提交
1645
              tstrerror(code));
H
Hongze Cheng 已提交
1646
  }
H
Hongze Cheng 已提交
1647 1648
  return code;
}
H
Hongze Cheng 已提交
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667

int32_t tsdbFinishCommit(STsdb *pTsdb) {
  int32_t    code = 0;
  int32_t    lino = 0;
  SMemTable *pMemTable = pTsdb->imem;

  // lock
  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pTsdb->imem = NULL;

  // unlock
  taosThreadRwlockUnlock(&pTsdb->rwLock);
H
Hongze Cheng 已提交
1668 1669 1670
  if (pMemTable) {
    tsdbUnrefMemTable(pMemTable);
  }
H
Hongze Cheng 已提交
1671 1672 1673

_exit:
  if (code) {
S
Shengliang Guan 已提交
1674
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1675
  } else {
S
Shengliang Guan 已提交
1676
    tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689
  }
  return code;
}

int32_t tsdbRollbackCommit(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
S
Shengliang Guan 已提交
1690
    tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
1691
  } else {
S
Shengliang Guan 已提交
1692
    tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
H
Hongze Cheng 已提交
1693 1694
  }
  return code;
1695
}