tsdbCommit.c 52.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
more  
Hongze Cheng 已提交
18 19
#define TSDB_MAX_SUBBLOCKS 8

H
more  
Hongze Cheng 已提交
20
typedef struct {
L
Liu Jicong 已提交
21
  STable            *pTable;
H
more  
Hongze Cheng 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
  SSkipListIterator *pIter;
} SCommitIter;

typedef struct {
  SRtn         rtn;     // retention snapshot
  SFSIter      fsIter;  // tsdb file iterator
  int          niters;  // memory iterators
  SCommitIter *iters;
  bool         isRFileSet;  // read and commit FSET
  SReadH       readh;
  SDFileSet    wSet;
  bool         isDFileSame;
  bool         isLFileSame;
  TSKEY        minKey;
  TSKEY        maxKey;
L
Liu Jicong 已提交
37 38 39 40 41
  SArray      *aBlkIdx;  // SBlockIdx array
  STable      *pTable;
  SArray      *aSupBlk;  // Table super-block array
  SArray      *aSubBlk;  // table sub-block array
  SDataCols   *pDataCols;
H
more  
Hongze Cheng 已提交
42 43
} SCommitH;

H
more  
Hongze Cheng 已提交
44 45
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)

C
Cary Xu 已提交
46 47 48 49 50 51 52 53 54 55 56 57
#define TSDB_COMMIT_REPO(ch)         TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch)      REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch)   (&((ch)->wSet))
#define TSDB_COMMIT_TABLE(ch)        ((ch)->pTable)
#define TSDB_COMMIT_HEAD_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch)          TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch)     TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch)        TSDB_READ_EXBUF(&((ch)->readh))
H
Hongze Cheng 已提交
58
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
C
Cary Xu 已提交
59
#define TSDB_COMMIT_TXN_VERSION(ch)  FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
H
refact  
Hongze Cheng 已提交
60

H
more  
Hongze Cheng 已提交
61 62
static void tsdbStartCommit(STsdb *pRepo);
static void tsdbEndCommit(STsdb *pTsdb, int eno);
H
refact  
Hongze Cheng 已提交
63 64 65 66 67
static int  tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int  tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int  tsdbCreateCommitIters(SCommitH *pCommith);
H
Hongze Cheng 已提交
68
static void tsdbDestroyCommitIters(SCommitH *pCommith);
H
refact  
Hongze Cheng 已提交
69
static int  tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
H
more  
Hongze Cheng 已提交
70 71
static void tsdbResetCommitFile(SCommitH *pCommith);
static int  tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
H
more  
Hongze Cheng 已提交
72
static int  tsdbCommitToTable(SCommitH *pCommith, int tid);
73
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
C
Cary Xu 已提交
74
static int  tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
H
more  
Hongze Cheng 已提交
75 76 77 78
static int  tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int  tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int  tsdbWriteBlockInfo(SCommitH *pCommih);
static int  tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
H
more  
Hongze Cheng 已提交
79
static int  tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
H
more  
Hongze Cheng 已提交
80 81 82 83 84 85
static int  tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int  tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int  tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
                               bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
H
more  
Hongze Cheng 已提交
86
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
H
Hongze Cheng 已提交
87 88
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
                                      SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
H
more  
Hongze Cheng 已提交
89
static int  tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
H
Hongze Cheng 已提交
90
static int  tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
H
refact  
Hongze Cheng 已提交
91

H
Hongze Cheng 已提交
92 93 94 95 96 97 98 99 100 101 102 103
int tsdbBegin(STsdb *pTsdb) {
  if (!pTsdb) return 0;

  STsdbMemTable *pMem;

  if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
    return -1;
  }

  return 0;
}

H
more  
Hongze Cheng 已提交
104 105
int32_t tsdbCommit(STsdb *pTsdb) {
  int32_t    code = 0;
H
Hongze Cheng 已提交
106 107 108
  SCommitH   commith = {0};
  SDFileSet *pSet = NULL;
  int        fid;
H
refact  
Hongze Cheng 已提交
109

H
more  
Hongze Cheng 已提交
110 111 112
  ASSERT(pTsdb->imem == NULL && pTsdb->mem);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
refact  
Hongze Cheng 已提交
113

H
more  
Hongze Cheng 已提交
114 115 116
  // start commit
  tsdbStartCommit(pTsdb);
  if (tsdbInitCommitH(&commith, pTsdb) < 0) {
H
refact  
Hongze Cheng 已提交
117 118 119
    return -1;
  }

H
more  
Hongze Cheng 已提交
120
#if 0
H
refact  
Hongze Cheng 已提交
121 122 123 124
  // Skip expired memory data and expired FSET
  tsdbSeekCommitIter(&commith, commith.rtn.minKey);
  while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
    if (pSet->fid < commith.rtn.minFid) {
H
more  
Hongze Cheng 已提交
125
      tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid,
H
refact  
Hongze Cheng 已提交
126 127 128 129 130
               TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
    } else {
      break;
    }
  }
H
more  
Hongze Cheng 已提交
131
#endif
H
refact  
Hongze Cheng 已提交
132

H
more  
Hongze Cheng 已提交
133
  // commit
H
refact  
Hongze Cheng 已提交
134 135 136 137 138 139 140 141
  fid = tsdbNextCommitFid(&(commith));
  while (true) {
    // Loop over both on disk and memory
    if (pSet == NULL && fid == TSDB_IVLD_FID) break;

    if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
      // Only has existing FSET but no memory data to commit in this
      // existing FSET, only check if file in correct retention
H
more  
Hongze Cheng 已提交
142
      if (tsdbApplyRtnOnFSet(pTsdb, pSet, &(commith.rtn)) < 0) {
H
refact  
Hongze Cheng 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
        tsdbDestroyCommitH(&commith);
        return -1;
      }

      pSet = tsdbFSIterNext(&(commith.fsIter));
    } else {
      // Has memory data to commit
      SDFileSet *pCSet;
      int        cfid;

      if (pSet == NULL || pSet->fid > fid) {
        // Commit to a new FSET with fid: fid
        pCSet = NULL;
        cfid = fid;
      } else {
        // Commit to an existing FSET
        pCSet = pSet;
        cfid = pSet->fid;
        pSet = tsdbFSIterNext(&(commith.fsIter));
      }

      if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) {
        tsdbDestroyCommitH(&commith);
        return -1;
      }

      fid = tsdbNextCommitFid(&commith);
    }
  }
H
more  
Hongze Cheng 已提交
172

H
more  
Hongze Cheng 已提交
173
  // end commit
H
refact  
Hongze Cheng 已提交
174
  tsdbDestroyCommitH(&commith);
H
more  
Hongze Cheng 已提交
175
  tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS);
H
more  
Hongze Cheng 已提交
176

H
more  
Hongze Cheng 已提交
177
  return code;
H
Hongze Cheng 已提交
178 179
}

H
Hongze Cheng 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
  SDiskID   did;
  SDFileSet nSet = {0};
  STsdbFS  *pfs = REPO_FS(pRepo);
  int       level;

  ASSERT(pSet->fid >= pRtn->minFid);

  level = tsdbGetFidLevel(pSet->fid, pRtn);

  if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
    terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
    return -1;
  }

  if (did.level > TSDB_FSET_LEVEL(pSet)) {
    // Need to move the FSET to higher level
    tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));

    if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
      tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
                TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
      return -1;
    }

    if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
      return -1;
    }

    tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
             TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
  } else {
    // On a correct level
    if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
      return -1;
    }
  }

  return 0;
}

H
refact  
Hongze Cheng 已提交
221 222
// int tsdbPrepareCommit(STsdb *pTsdb) {
//   if (pTsdb->mem == NULL) return 0;
H
Hongze Cheng 已提交
223

H
refact  
Hongze Cheng 已提交
224
//   ASSERT(pTsdb->imem == NULL);
H
Hongze Cheng 已提交
225

H
refact  
Hongze Cheng 已提交
226 227 228 229
//   pTsdb->imem = pTsdb->mem;
//   pTsdb->mem = NULL;
//   return 0;
// }
H
Hongze Cheng 已提交
230

H
Hongze Cheng 已提交
231
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
C
Cary Xu 已提交
232
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
H
Hongze Cheng 已提交
233
  TSKEY         minKey, midKey, maxKey, now;
H
Hongze Cheng 已提交
234 235

  now = taosGetTimestamp(pCfg->precision);
236 237 238
  minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
  midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
  maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
H
Hongze Cheng 已提交
239 240

  pRtn->minKey = minKey;
H
refact  
Hongze Cheng 已提交
241 242 243
  pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
  pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
  pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
S
Shengliang Guan 已提交
244
  tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
H
Hongze Cheng 已提交
245 246 247
            pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}

H
more  
Hongze Cheng 已提交
248 249 250
static void tsdbStartCommit(STsdb *pRepo) {
  STsdbMemTable *pMem = pRepo->imem;

S
Shengliang Guan 已提交
251
  tsdbInfo("vgId:%d, start to commit", REPO_ID(pRepo));
H
more  
Hongze Cheng 已提交
252 253 254 255 256 257

  tsdbStartFSTxn(pRepo, 0, 0);
}

static void tsdbEndCommit(STsdb *pTsdb, int eno) {
  tsdbEndFSTxn(pTsdb);
H
more  
Hongze Cheng 已提交
258
  tsdbMemTableDestroy(pTsdb->imem);
H
more  
Hongze Cheng 已提交
259
  pTsdb->imem = NULL;
S
Shengliang Guan 已提交
260
  tsdbInfo("vgId:%d, commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
H
more  
Hongze Cheng 已提交
261 262
}

H
refact  
Hongze Cheng 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
  STsdbCfg *pCfg = REPO_CFG(pRepo);

  memset(pCommith, 0, sizeof(*pCommith));
  tsdbGetRtnSnap(pRepo, &(pCommith->rtn));

  TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith));

  // Init read handle
  if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
    return -1;
  }

  // Init file iterator
  tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);

  if (tsdbCreateCommitIters(pCommith) < 0) {
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
  if (pCommith->aBlkIdx == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
  if (pCommith->aSupBlk == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
  if (pCommith->aSubBlk == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
305
  pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows);
H
refact  
Hongze Cheng 已提交
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  if (pCommith->pDataCols == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  return 0;
}

// Skip all keys until key (not included)
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
  for (int i = 0; i < pCommith->niters; i++) {
    SCommitIter *pIter = pCommith->iters + i;
    if (pIter->pTable == NULL || pIter->pIter == NULL) continue;

321 322
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0,
                          true, NULL);
H
refact  
Hongze Cheng 已提交
323 324 325
  }
}

H
more  
Hongze Cheng 已提交
326
static int tsdbNextCommitFid(SCommitH *pCommith) {
C
Cary Xu 已提交
327 328 329
  STsdb        *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
  int           fid = TSDB_IVLD_FID;
H
more  
Hongze Cheng 已提交
330 331 332

  for (int i = 0; i < pCommith->niters; i++) {
    SCommitIter *pIter = pCommith->iters + i;
C
Cary Xu 已提交
333
    // if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
H
more  
Hongze Cheng 已提交
334 335 336 337 338

    TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
    if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
      continue;
    } else {
H
refact  
Hongze Cheng 已提交
339
      int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision));
H
more  
Hongze Cheng 已提交
340 341 342 343 344 345 346 347
      if (fid == TSDB_IVLD_FID || fid > tfid) {
        fid = tfid;
      }
    }
  }

  return fid;
}
H
refact  
Hongze Cheng 已提交
348 349 350 351 352 353 354 355 356 357 358

static void tsdbDestroyCommitH(SCommitH *pCommith) {
  pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
  pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
  pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
  pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx);
  tsdbDestroyCommitIters(pCommith);
  tsdbDestroyReadH(&(pCommith->readh));
  tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}

H
more  
Hongze Cheng 已提交
359
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
C
Cary Xu 已提交
360 361
  STsdb        *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
H
Hongze Cheng 已提交
362

H
more  
Hongze Cheng 已提交
363
  ASSERT(pSet == NULL || pSet->fid == fid);
H
Hongze Cheng 已提交
364

H
more  
Hongze Cheng 已提交
365
  tsdbResetCommitFile(pCommith);
H
refact  
Hongze Cheng 已提交
366
  tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
H
Hongze Cheng 已提交
367

H
more  
Hongze Cheng 已提交
368 369 370 371
  // Set and open files
  if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
    return -1;
  }
C
Cary Xu 已提交
372
#if 0
H
more  
Hongze Cheng 已提交
373
  // Loop to commit each table data
H
more  
Hongze Cheng 已提交
374
  for (int tid = 0; tid < pCommith->niters; tid++) {
H
more  
Hongze Cheng 已提交
375
    SCommitIter *pIter = pCommith->iters + tid;
H
Hongze Cheng 已提交
376

H
more  
Hongze Cheng 已提交
377
    if (pIter->pTable == NULL) continue;
H
Hongze Cheng 已提交
378

H
more  
Hongze Cheng 已提交
379 380 381 382 383 384 385
    if (tsdbCommitToTable(pCommith, tid) < 0) {
      tsdbCloseCommitFile(pCommith, true);
      // revert the file change
      tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
      return -1;
    }
  }
C
Cary Xu 已提交
386 387
#endif
  // Loop to commit each table data in mem and file
C
Cary Xu 已提交
388 389
  int mIter = 0, fIter = 0;
  int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx);
C
Cary Xu 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403

  while (true) {
    SBlockIdx   *pIdx = NULL;
    SCommitIter *pIter = NULL;
    if (mIter < pCommith->niters) {
      pIter = pCommith->iters + mIter;
      if (fIter < nBlkIdx) {
        pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
      }
    } else if (fIter < nBlkIdx) {
      pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
    } else {
      break;
    }
C
Cary Xu 已提交
404

C
Cary Xu 已提交
405 406 407 408 409 410 411 412 413 414 415 416
    if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) {
      if (tsdbCommitToTable(pCommith, mIter) < 0) {
        tsdbCloseCommitFile(pCommith, true);
        // revert the file change
        tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
        return -1;
      }

      if (pIdx && (pIter->pTable->uid == pIdx->uid)) {
        ++fIter;
      }
      ++mIter;
C
Cary Xu 已提交
417 418
    } else if (pIter && !pIter->pTable) {
      // When table already dropped during commit, pIter is not NULL but pIter->pTable is NULL.
419
      ++mIter;  // skip the table and do nothing
C
Cary Xu 已提交
420 421 422 423 424 425 426 427 428 429
    } else if (pIdx) {
      if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) {
        tsdbCloseCommitFile(pCommith, true);
        // revert the file change
        tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
        return -1;
      }
      ++fIter;
    }
  }
H
Hongze Cheng 已提交
430

H
more  
Hongze Cheng 已提交
431 432
  if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
      0) {
S
Shengliang Guan 已提交
433
    tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
434 435 436 437 438
    tsdbCloseCommitFile(pCommith, true);
    // revert the file change
    tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
    return -1;
  }
H
Hongze Cheng 已提交
439

H
more  
Hongze Cheng 已提交
440
  if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
S
Shengliang Guan 已提交
441
    tsdbError("vgId:%d, failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
H
more  
Hongze Cheng 已提交
442 443 444 445 446
    tsdbCloseCommitFile(pCommith, true);
    // revert the file change
    tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
    return -1;
  }
H
Hongze Cheng 已提交
447

H
more  
Hongze Cheng 已提交
448 449
  // Close commit file
  tsdbCloseCommitFile(pCommith, false);
H
Hongze Cheng 已提交
450

H
more  
Hongze Cheng 已提交
451 452 453
  if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
454

H
more  
Hongze Cheng 已提交
455 456
  return 0;
}
H
refact  
Hongze Cheng 已提交
457

H
Hongze Cheng 已提交
458
static int tsdbCreateCommitIters(SCommitH *pCommith) {
L
Liu Jicong 已提交
459 460
  STsdb             *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbMemTable     *pMem = pRepo->imem;
H
Hongze Cheng 已提交
461
  SSkipListIterator *pSlIter;
L
Liu Jicong 已提交
462 463 464
  SCommitIter       *pCommitIter;
  SSkipListNode     *pNode;
  STbData           *pTbData;
C
Cary Xu 已提交
465
  STSchema          *pTSchema = NULL;
H
Hongze Cheng 已提交
466 467

  pCommith->niters = SL_SIZE(pMem->pSlIdx);
wafwerar's avatar
wafwerar 已提交
468
  pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter));
H
Hongze Cheng 已提交
469 470
  if (pCommith->iters == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
H
refact  
Hongze Cheng 已提交
471 472 473
    return -1;
  }

H
Hongze Cheng 已提交
474 475 476 477
  // Loop to create iters for each skiplist
  pSlIter = tSkipListCreateIter(pMem->pSlIdx);
  if (pSlIter == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
H
refact  
Hongze Cheng 已提交
478 479
    return -1;
  }
H
Hongze Cheng 已提交
480 481 482 483
  for (int i = 0; i < pCommith->niters; i++) {
    tSkipListIterNext(pSlIter);
    pNode = tSkipListIterGet(pSlIter);
    pTbData = (STbData *)pNode->pData;
H
refact  
Hongze Cheng 已提交
484

H
Hongze Cheng 已提交
485
    pCommitIter = pCommith->iters + i;
486
    pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1);
C
Cary Xu 已提交
487 488

    if (pTSchema) {
C
Cary Xu 已提交
489 490 491
      pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
      tSkipListIterNext(pCommitIter->pIter);

C
Cary Xu 已提交
492 493 494
      pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
      pCommitIter->pTable->uid = pTbData->uid;
      pCommitIter->pTable->tid = pTbData->uid;
495 496
      pCommitIter->pTable->pSchema = pTSchema;
      pCommitIter->pTable->pCacheSchema = NULL;
C
Cary Xu 已提交
497
    }
H
refact  
Hongze Cheng 已提交
498
  }
H
Hongze Cheng 已提交
499
  tSkipListDestroyIter(pSlIter);
H
refact  
Hongze Cheng 已提交
500 501 502 503

  return 0;
}

H
Hongze Cheng 已提交
504 505
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
  if (pCommith->iters == NULL) return;
H
refact  
Hongze Cheng 已提交
506

H
Hongze Cheng 已提交
507 508
  for (int i = 1; i < pCommith->niters; i++) {
    tSkipListDestroyIter(pCommith->iters[i].pIter);
509 510
    if (pCommith->iters[i].pTable) {
      tdFreeSchema(pCommith->iters[i].pTable->pSchema);
511
      tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema);
512 513
      taosMemoryFreeClear(pCommith->iters[i].pTable);
    }
H
refact  
Hongze Cheng 已提交
514 515
  }

wafwerar's avatar
wafwerar 已提交
516
  taosMemoryFree(pCommith->iters);
H
Hongze Cheng 已提交
517 518
  pCommith->iters = NULL;
  pCommith->niters = 0;
H
refact  
Hongze Cheng 已提交
519 520
}

H
more  
Hongze Cheng 已提交
521 522 523 524 525 526
static void tsdbResetCommitFile(SCommitH *pCommith) {
  pCommith->isRFileSet = false;
  pCommith->isDFileSame = false;
  pCommith->isLFileSame = false;
  taosArrayClear(pCommith->aBlkIdx);
}
H
Hongze Cheng 已提交
527

H
more  
Hongze Cheng 已提交
528 529
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
  SDiskID    did;
L
Liu Jicong 已提交
530
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
531
  SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
H
Hongze Cheng 已提交
532

H
Hongze Cheng 已提交
533
  if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
H
more  
Hongze Cheng 已提交
534 535 536
    terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
    return -1;
  }
H
Hongze Cheng 已提交
537

H
more  
Hongze Cheng 已提交
538 539 540 541 542
  // Open read FSET
  if (pSet) {
    if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
      return -1;
    }
H
Hongze Cheng 已提交
543

H
more  
Hongze Cheng 已提交
544
    pCommith->isRFileSet = true;
H
Hongze Cheng 已提交
545

H
more  
Hongze Cheng 已提交
546 547 548 549
    if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
      tsdbCloseAndUnsetFSet(&(pCommith->readh));
      return -1;
    }
H
Hongze Cheng 已提交
550

H
Hongze Cheng 已提交
551 552
    tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo),
              TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
H
more  
Hongze Cheng 已提交
553 554 555
  } else {
    pCommith->isRFileSet = false;
  }
H
Hongze Cheng 已提交
556

H
more  
Hongze Cheng 已提交
557 558 559
  // Set and open commit FSET
  if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
    // Create a new FSET to write data
S
Shengliang Guan 已提交
560
    tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
H
Hongze Cheng 已提交
561

S
Shengliang Guan 已提交
562
    if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
S
Shengliang Guan 已提交
563
      tsdbError("vgId:%d, failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
H
more  
Hongze Cheng 已提交
564 565 566 567 568 569
                TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
      }
      return -1;
    }
H
Hongze Cheng 已提交
570

H
more  
Hongze Cheng 已提交
571 572
    pCommith->isDFileSame = false;
    pCommith->isLFileSame = false;
H
Hongze Cheng 已提交
573

S
Shengliang Guan 已提交
574
    tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
H
more  
Hongze Cheng 已提交
575 576 577 578
              TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
  } else {
    did.level = TSDB_FSET_LEVEL(pSet);
    did.id = TSDB_FSET_ID(pSet);
H
Hongze Cheng 已提交
579

H
more  
Hongze Cheng 已提交
580 581
    pCommith->wSet.fid = fid;
    pCommith->wSet.state = 0;
H
Hongze Cheng 已提交
582

H
more  
Hongze Cheng 已提交
583 584
    // TSDB_FILE_HEAD
    SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
S
Shengliang Guan 已提交
585
    tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
586
    if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) {
S
Shengliang Guan 已提交
587
      tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
H
more  
Hongze Cheng 已提交
588
                tstrerror(terrno));
H
Hongze Cheng 已提交
589

H
more  
Hongze Cheng 已提交
590 591 592
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
        return -1;
H
Hongze Cheng 已提交
593 594 595
      }
    }

H
more  
Hongze Cheng 已提交
596 597 598 599
    // TSDB_FILE_DATA
    SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
    SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
    tsdbInitDFileEx(pWDataf, pRDataf);
600 601
    // if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
    if (tsdbOpenDFile(pWDataf, TD_FILE_WRITE) < 0) {
S
Shengliang Guan 已提交
602
      tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
H
more  
Hongze Cheng 已提交
603
                tstrerror(terrno));
H
Hongze Cheng 已提交
604

H
more  
Hongze Cheng 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
      tsdbCloseDFileSet(pWSet);
      tsdbRemoveDFile(pWHeadf);
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
        return -1;
      }
    }
    pCommith->isDFileSame = true;

    // TSDB_FILE_LAST
    SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
    SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
    if (pRLastf->info.size < 32 * 1024) {
      tsdbInitDFileEx(pWLastf, pRLastf);
      pCommith->isLFileSame = true;

621 622
      // if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
      if (tsdbOpenDFile(pWLastf, TD_FILE_WRITE) < 0) {
S
Shengliang Guan 已提交
623
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
H
more  
Hongze Cheng 已提交
624
                  tstrerror(terrno));
H
Hongze Cheng 已提交
625

H
more  
Hongze Cheng 已提交
626 627 628 629 630 631 632 633
        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
S
Shengliang Guan 已提交
634
      tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
H
more  
Hongze Cheng 已提交
635
      pCommith->isLFileSame = false;
H
Hongze Cheng 已提交
636

637
      if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
S
Shengliang Guan 已提交
638
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
H
more  
Hongze Cheng 已提交
639
                  tstrerror(terrno));
H
Hongze Cheng 已提交
640

H
more  
Hongze Cheng 已提交
641 642 643 644 645 646 647 648
        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }
649 650 651 652 653

    // TSDB_FILE_SMAD
    SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
    SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);

654
    if (!taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmadF))) {
S
Shengliang Guan 已提交
655
      tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
656 657 658
      tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);

      if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
S
Shengliang Guan 已提交
659
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
660 661 662 663 664 665 666 667 668 669 670 671
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
      tsdbInitDFileEx(pWSmadF, pRSmadF);
      if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
S
Shengliang Guan 已提交
672
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }

    // TSDB_FILE_SMAL
    SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
    SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);

688
    if ((pCommith->isLFileSame) && taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmalF))) {
689 690
      tsdbInitDFileEx(pWSmalF, pRSmalF);
      if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
S
Shengliang Guan 已提交
691
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
692 693 694 695 696 697 698 699 700 701
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
S
Shengliang Guan 已提交
702
      tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
703 704 705
      tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);

      if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
S
Shengliang Guan 已提交
706
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
707 708 709 710 711 712 713 714 715 716
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }
H
more  
Hongze Cheng 已提交
717
  }
H
Hongze Cheng 已提交
718 719 720 721

  return 0;
}

H
more  
Hongze Cheng 已提交
722
// extern int32_t tsTsdbMetaCompactRatio;
H
Hongze Cheng 已提交
723

H
more  
Hongze Cheng 已提交
724 725
static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
                                  SBlockIdx *pIdx) {
H
more  
Hongze Cheng 已提交
726 727 728 729 730
  size_t      nSupBlocks;
  size_t      nSubBlocks;
  uint32_t    tlen;
  SBlockInfo *pBlkInfo;
  int64_t     offset;
L
Liu Jicong 已提交
731
  SBlock     *pBlock;
H
Hongze Cheng 已提交
732

H
more  
Hongze Cheng 已提交
733
  memset(pIdx, 0, sizeof(*pIdx));
H
Hongze Cheng 已提交
734

H
more  
Hongze Cheng 已提交
735 736
  nSupBlocks = taosArrayGetSize(pSupA);
  nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
H
Hongze Cheng 已提交
737

H
more  
Hongze Cheng 已提交
738 739 740 741
  if (nSupBlocks <= 0) {
    // No data (data all deleted)
    return 0;
  }
H
Hongze Cheng 已提交
742

H
more  
Hongze Cheng 已提交
743 744 745
  tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
  if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
  pBlkInfo = *ppBuf;
H
Hongze Cheng 已提交
746

H
more  
Hongze Cheng 已提交
747 748 749
  pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
  pBlkInfo->tid = TABLE_TID(pTable);
  pBlkInfo->uid = TABLE_UID(pTable);
H
Hongze Cheng 已提交
750

H
more  
Hongze Cheng 已提交
751 752 753
  memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
  if (nSubBlocks > 0) {
    memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
H
Hongze Cheng 已提交
754

H
more  
Hongze Cheng 已提交
755 756
    for (int i = 0; i < nSupBlocks; i++) {
      pBlock = pBlkInfo->blocks + i;
H
Hongze Cheng 已提交
757

H
more  
Hongze Cheng 已提交
758 759 760 761 762
      if (pBlock->numOfSubBlocks > 1) {
        pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
      }
    }
  }
H
Hongze Cheng 已提交
763

H
more  
Hongze Cheng 已提交
764
  taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
H
Hongze Cheng 已提交
765

H
more  
Hongze Cheng 已提交
766 767 768
  if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
769

H
more  
Hongze Cheng 已提交
770
  tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
H
Hongze Cheng 已提交
771

H
more  
Hongze Cheng 已提交
772 773
  // Set pIdx
  pBlock = taosArrayGetLast(pSupA);
H
Hongze Cheng 已提交
774

H
more  
Hongze Cheng 已提交
775 776
  pIdx->uid = TABLE_UID(pTable);
  pIdx->hasLast = pBlock->last ? 1 : 0;
H
Hongze Cheng 已提交
777
  pIdx->maxKey = pBlock->maxKey;
H
more  
Hongze Cheng 已提交
778 779 780
  pIdx->numOfBlocks = (uint32_t)nSupBlocks;
  pIdx->len = tlen;
  pIdx->offset = (uint32_t)offset;
H
Hongze Cheng 已提交
781

H
more  
Hongze Cheng 已提交
782 783
  return 0;
}
H
Hongze Cheng 已提交
784

H
more  
Hongze Cheng 已提交
785
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
H
more  
Hongze Cheng 已提交
786 787 788 789
  SBlockIdx *pBlkIdx;
  size_t     nidx = taosArrayGetSize(pIdxA);
  int        tlen = 0, size;
  int64_t    offset;
H
more  
Hongze Cheng 已提交
790

H
more  
Hongze Cheng 已提交
791 792 793 794 795 796
  if (nidx <= 0) {
    // All data are deleted
    pHeadf->info.offset = 0;
    pHeadf->info.len = 0;
    return 0;
  }
H
Hongze Cheng 已提交
797

H
more  
Hongze Cheng 已提交
798 799
  for (size_t i = 0; i < nidx; i++) {
    pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
H
Hongze Cheng 已提交
800

H
more  
Hongze Cheng 已提交
801 802
    size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
    if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
H
Hongze Cheng 已提交
803

H
more  
Hongze Cheng 已提交
804 805
    void *ptr = POINTER_SHIFT(*ppBuf, tlen);
    tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
H
Hongze Cheng 已提交
806

H
more  
Hongze Cheng 已提交
807 808
    tlen += size;
  }
H
Hongze Cheng 已提交
809

H
more  
Hongze Cheng 已提交
810 811 812
  tlen += sizeof(TSCKSUM);
  if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
  taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
H
Hongze Cheng 已提交
813

H
more  
Hongze Cheng 已提交
814 815 816
  if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
    return -1;
  }
H
Hongze Cheng 已提交
817

H
more  
Hongze Cheng 已提交
818 819 820
  tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
  pHeadf->info.offset = (uint32_t)offset;
  pHeadf->info.len = tlen;
H
Hongze Cheng 已提交
821

H
more  
Hongze Cheng 已提交
822 823
  return 0;
}
H
Hongze Cheng 已提交
824

H
Hongze Cheng 已提交
825
// =================== Commit Time-Series Data
H
Hongze Cheng 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
  SCommitIter *pIter = pCommith->iters + tid;
  TSKEY        nextKey = tsdbNextIterKey(pIter->pIter);

  tsdbResetCommitTable(pCommith);

  // Set commit table
  if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
    return -1;
  }

  // No disk data and no memory data, just return
  if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
    return 0;
  }

  // Must has disk data or has memory data
  int     nBlocks;
  int     bidx = 0;
  SBlock *pBlock;

  if (pCommith->readh.pBlkIdx) {
    if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
      return -1;
    }

    nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
  } else {
    nBlocks = 0;
  }

  if (bidx < nBlocks) {
    pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  } else {
    pBlock = NULL;
  }

  while (true) {
    if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;

    if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
        (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
      if (tsdbMoveBlock(pCommith, bidx) < 0) {
        return -1;
      }

      bidx++;
      if (bidx < nBlocks) {
        pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
      } else {
        pBlock = NULL;
      }
    } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
H
more  
Hongze Cheng 已提交
879 880 881 882
      // merge pBlock data and memory data
      if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
        return -1;
      }
H
more  
Hongze Cheng 已提交
883

H
more  
Hongze Cheng 已提交
884 885 886 887 888 889 890
      bidx++;
      if (bidx < nBlocks) {
        pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
      } else {
        pBlock = NULL;
      }
      nextKey = tsdbNextIterKey(pIter->pIter);
H
Hongze Cheng 已提交
891
    } else {
H
more  
Hongze Cheng 已提交
892 893 894 895 896 897
      // Only commit memory data
      if (pBlock == NULL) {
        if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
          return -1;
        }
      } else {
H
Hongze Cheng 已提交
898
        if (tsdbCommitMemData(pCommith, pIter, pBlock->minKey.ts - 1, true) < 0) {
H
more  
Hongze Cheng 已提交
899 900 901 902
          return -1;
        }
      }
      nextKey = tsdbNextIterKey(pIter->pIter);
H
Hongze Cheng 已提交
903 904 905
    }
  }

C
Cary Xu 已提交
906
  if (tsdbWriteBlockInfo(pCommith) < 0) {
S
Shengliang Guan 已提交
907
    tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
C
Cary Xu 已提交
908 909 910 911 912 913 914 915
              TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
    return -1;
  }

  return 0;
}

static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
916 917 918 919 920
  SReadH   *pReadh = &pCommith->readh;
  STsdb    *pTsdb = TSDB_READ_REPO(pReadh);
  STSchema *pTSchema = NULL;
  int       nBlocks = pIdx->numOfBlocks;
  int       bidx = 0;
C
Cary Xu 已提交
921 922 923 924 925 926 927 928 929

  tsdbResetCommitTable(pCommith);

  pReadh->pBlkIdx = pIdx;

  if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
    return -1;
  }

930 931 932
  STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
  pCommith->pTable = &table;

C
Cary Xu 已提交
933
  while (bidx < nBlocks) {
934 935
    if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
      // Set commit table
936
      pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1);  // TODO: schema version
937 938 939 940 941 942 943 944 945 946 947
      if (!pTSchema) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      table.pSchema = pTSchema;
      if (tsdbSetCommitTable(pCommith, &table) < 0) {
        taosMemoryFreeClear(pTSchema);
        return -1;
      }
    }

C
Cary Xu 已提交
948
    if (tsdbMoveBlock(pCommith, bidx) < 0) {
S
Shengliang Guan 已提交
949
      tsdbError("vgId:%d, failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
C
Cary Xu 已提交
950
                TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
951
      taosMemoryFreeClear(pTSchema);
C
Cary Xu 已提交
952 953
      return -1;
    }
954

C
Cary Xu 已提交
955 956 957
    ++bidx;
  }

H
more  
Hongze Cheng 已提交
958
  if (tsdbWriteBlockInfo(pCommith) < 0) {
S
Shengliang Guan 已提交
959
    tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
H
more  
Hongze Cheng 已提交
960
              TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
961
    taosMemoryFreeClear(pTSchema);
H
more  
Hongze Cheng 已提交
962 963
    return -1;
  }
H
Hongze Cheng 已提交
964

965
  taosMemoryFreeClear(pTSchema);
H
Hongze Cheng 已提交
966 967 968 969
  return 0;
}

static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
H
Hongze Cheng 已提交
970
  STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1);
H
Hongze Cheng 已提交
971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992

  pCommith->pTable = pTable;

  if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (pCommith->isRFileSet) {
    if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) {
      return -1;
    }
  } else {
    pCommith->readh.pBlkIdx = NULL;
  }
  return 0;
}

static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
  TSKEY   key = *(TSKEY *)arg1;
  SBlock *pBlock = (SBlock *)arg2;

H
Hongze Cheng 已提交
993
  if (key < pBlock->minKey.ts) {
H
Hongze Cheng 已提交
994
    return -1;
H
Hongze Cheng 已提交
995
  } else if (key > pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
996 997 998 999 1000 1001
    return 1;
  } else {
    return 0;
  }
}

C
Cary Xu 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
/**
 * @brief Write SDataCols to data file.
 *
 * @param pRepo
 * @param pTable
 * @param pDFile
 * @param pDFileAggr
 * @param pDataCols The pDataCols would be generated from mem/imem directly with 2 bits bitmap or from tsdbRead
 * interface with 1 bit bitmap.
 * @param pBlock
 * @param isLast
 * @param isSuper
 * @param ppBuf
 * @param ppCBuf
 * @param ppExBuf
 * @return int
 */
H
more  
Hongze Cheng 已提交
1019 1020
static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
                              SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
C
Cary Xu 已提交
1021 1022
  STsdbCfg     *pCfg = REPO_CFG(pRepo);
  SBlockData   *pBlockData = NULL;
1023
  SAggrBlkData *pAggrBlkData = NULL;
C
Cary Xu 已提交
1024
  STSchema     *pSchema = pTable->pSchema;
1025 1026
  int64_t       offset = 0, offsetAggr = 0;
  int           rowsToWrite = pDataCols->numOfRows;
H
Hongze Cheng 已提交
1027

H
refact  
Hongze Cheng 已提交
1028 1029
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows);
  ASSERT((!isLast) || rowsToWrite < pCfg->minRows);
H
Hongze Cheng 已提交
1030 1031

  // Make buffer space
1032
  if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
H
Hongze Cheng 已提交
1033 1034 1035 1036
    return -1;
  }
  pBlockData = (SBlockData *)(*ppBuf);

1037 1038 1039 1040 1041
  if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
    return -1;
  }
  pAggrBlkData = (SAggrBlkData *)(*ppExBuf);

H
Hongze Cheng 已提交
1042
  // Get # of cols not all NULL(not including key column)
C
Cary Xu 已提交
1043 1044
  col_id_t nColsNotAllNull = 0;
  col_id_t nColsOfBlockSma = 0;
1045
  for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) {  // ncol from 1, we skip the timestamp column
C
Cary Xu 已提交
1046 1047 1048
    STColumn    *pColumn = pSchema->columns + ncol;
    SDataCol    *pDataCol = pDataCols->cols + ncol;
    SBlockCol   *pBlockCol = pBlockData->cols + nColsNotAllNull;
C
Cary Xu 已提交
1049
    SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsOfBlockSma;
H
Hongze Cheng 已提交
1050 1051 1052 1053 1054 1055

    if (isAllRowsNull(pDataCol)) {  // all data to commit are NULL, just ignore it
      continue;
    }

    memset(pBlockCol, 0, sizeof(*pBlockCol));
1056
    memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
H
Hongze Cheng 已提交
1057 1058 1059

    pBlockCol->colId = pDataCol->colId;
    pBlockCol->type = pDataCol->type;
1060 1061
    pAggrBlkCol->colId = pDataCol->colId;

C
Cary Xu 已提交
1062
    if (isSuper && IS_BSMA_ON(pColumn) && tDataTypes[pDataCol->type].statisFunc) {
1063
#if 0
H
Hongze Cheng 已提交
1064 1065 1066
      (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
                                               &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
                                               &(pBlockCol->numOfNull));
1067
#endif
C
Cary Xu 已提交
1068 1069 1070
      (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, rowsToWrite,
                                               &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum),
                                               &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
1071 1072 1073
                                               &(pAggrBlkCol->numOfNull));

      if (pAggrBlkCol->numOfNull == 0) {
C
Cary Xu 已提交
1074
        pBlockCol->blen = 0;
C
Cary Xu 已提交
1075
      } else {
C
Cary Xu 已提交
1076
        pBlockCol->blen = 1;
C
Cary Xu 已提交
1077
      }
C
Cary Xu 已提交
1078
      ++nColsOfBlockSma;
C
Cary Xu 已提交
1079 1080
    } else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) {
      // check if all rows normal
C
Cary Xu 已提交
1081
      pBlockCol->blen = 0;
C
Cary Xu 已提交
1082
    } else {
C
Cary Xu 已提交
1083
      pBlockCol->blen = 1;
H
Hongze Cheng 已提交
1084
    }
C
Cary Xu 已提交
1085 1086

    ++nColsNotAllNull;
H
Hongze Cheng 已提交
1087 1088 1089 1090 1091 1092 1093
  }

  ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);

  // Compress the data if neccessary
  int      tcol = 0;  // counter of not all NULL and written columns
  uint32_t toffset = 0;
1094
  int32_t  tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
H
Hongze Cheng 已提交
1095
  int32_t  lsize = tsize;
C
Cary Xu 已提交
1096
  uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsOfBlockSma, SBlockVerLatest);
H
Hongze Cheng 已提交
1097
  int32_t  keyLen = 0;
C
Cary Xu 已提交
1098
  int32_t  nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
C
Cary Xu 已提交
1099
  int32_t  sBitmaps = isSuper ? (int32_t)TD_BITMAP_BYTES_I(rowsToWrite) : nBitmaps;
C
Cary Xu 已提交
1100

1101
  for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) {
H
Hongze Cheng 已提交
1102 1103 1104
    // All not NULL columns finish
    if (ncol != 0 && tcol >= nColsNotAllNull) break;

L
Liu Jicong 已提交
1105
    SDataCol  *pDataCol = pDataCols->cols + ncol;
H
Hongze Cheng 已提交
1106 1107 1108 1109 1110
    SBlockCol *pBlockCol = pBlockData->cols + tcol;

    if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;

    int32_t flen;  // final length
C
Cary Xu 已提交
1111
    int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite, pDataCols->bitmapMode);
C
Cary Xu 已提交
1112

C
Cary Xu 已提交
1113
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
1114
    int32_t tBitmaps = 0;
1115
    int32_t tBitmapsLen = 0;
C
Cary Xu 已提交
1116
    if ((ncol != 0) && (pBlockCol->blen > 0)) {
C
Cary Xu 已提交
1117
      tBitmaps = isSuper ? sBitmaps : nBitmaps;
C
Cary Xu 已提交
1118
    }
C
Cary Xu 已提交
1119
#endif
C
Cary Xu 已提交
1120

C
bug fix  
Cary Xu 已提交
1121
    void *tptr, *bptr;
H
Hongze Cheng 已提交
1122 1123

    // Make room
1124
    if (tsdbMakeRoom(ppBuf, lsize + tlen + tBitmaps + 2 * COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
H
Hongze Cheng 已提交
1125 1126 1127 1128 1129 1130
      return -1;
    }
    pBlockData = (SBlockData *)(*ppBuf);
    pBlockCol = pBlockData->cols + tcol;
    tptr = POINTER_SHIFT(pBlockData, lsize);

H
more  
Hongze Cheng 已提交
1131
    if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
H
Hongze Cheng 已提交
1132 1133 1134 1135 1136
      return -1;
    }

    // Compress or just copy
    if (pCfg->compression) {
1137
#if 0
C
Cary Xu 已提交
1138
      flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr,
H
Hongze Cheng 已提交
1139 1140
                                                      tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
                                                      tlen + COMP_OVERFLOW_BYTES);
1141 1142 1143 1144 1145
#endif
      flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
                                                      tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
                                                      tlen + COMP_OVERFLOW_BYTES);
      if (tBitmaps > 0) {
C
bug fix  
Cary Xu 已提交
1146
        bptr = POINTER_SHIFT(pBlockData, lsize + flen);
C
Cary Xu 已提交
1147
        if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
C
Cary Xu 已提交
1148
          tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
C
Cary Xu 已提交
1149
        }
1150
        tBitmapsLen =
C
bug fix  
Cary Xu 已提交
1151
            tsCompressTinyint((char *)pDataCol->pBitmap, tBitmaps, tBitmaps, bptr, tBitmaps + COMP_OVERFLOW_BYTES,
1152 1153 1154 1155
                              pCfg->compression, *ppCBuf, tBitmaps + COMP_OVERFLOW_BYTES);
        TASSERT((tBitmapsLen > 0) && (tBitmapsLen <= (tBitmaps + COMP_OVERFLOW_BYTES)));
        flen += tBitmapsLen;
      }
H
Hongze Cheng 已提交
1156 1157 1158
    } else {
      flen = tlen;
      memcpy(tptr, pDataCol->pData, flen);
1159
      if (tBitmaps > 0) {
C
bug fix  
Cary Xu 已提交
1160
        bptr = POINTER_SHIFT(pBlockData, lsize + flen);
1161 1162 1163
        if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
          tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
        }
C
bug fix  
Cary Xu 已提交
1164
        memcpy(bptr, pDataCol->pBitmap, tBitmaps);
1165 1166 1167
        tBitmapsLen = tBitmaps;
        flen += tBitmapsLen;
      }
H
Hongze Cheng 已提交
1168 1169 1170 1171
    }

    // Add checksum
    ASSERT(flen > 0);
1172
    ASSERT(tBitmapsLen <= 1024);
H
Hongze Cheng 已提交
1173 1174 1175 1176 1177 1178
    flen += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
    tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)));

    if (ncol != 0) {
      tsdbSetBlockColOffset(pBlockCol, toffset);
1179 1180
      pBlockCol->len = flen;  // data + bitmaps
      pBlockCol->blen = tBitmapsLen;
1181
      ++tcol;
H
Hongze Cheng 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
    } else {
      keyLen = flen;
    }

    toffset += flen;
    lsize += flen;
  }

  pBlockData->delimiter = TSDB_FILE_DELIMITER;
  pBlockData->uid = TABLE_UID(pTable);
  pBlockData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
  tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));

  // Write the whole block to file
  if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
    return -1;
  }

C
Cary Xu 已提交
1202
  uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0;
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
  if (aggrStatus > 0) {
    taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
    tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));

    // Write the whole block to file
    if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
      return -1;
    }
  }

1213
  // Update pBlock membership variables
H
Hongze Cheng 已提交
1214 1215 1216 1217 1218 1219 1220 1221
  pBlock->last = isLast;
  pBlock->offset = offset;
  pBlock->algorithm = pCfg->compression;
  pBlock->numOfRows = rowsToWrite;
  pBlock->len = lsize;
  pBlock->keyLen = keyLen;
  pBlock->numOfSubBlocks = isSuper ? 1 : 0;
  pBlock->numOfCols = nColsNotAllNull;
C
Cary Xu 已提交
1222
  pBlock->numOfBSma = nColsOfBlockSma;
H
Hongze Cheng 已提交
1223 1224
  pBlock->minKey.ts = dataColsKeyFirst(pDataCols);
  pBlock->maxKey.ts = dataColsKeyLast(pDataCols);
1225 1226 1227
  pBlock->aggrStat = aggrStatus;
  pBlock->blkVer = SBlockVerLatest;
  pBlock->aggrOffset = (uint64_t)offsetAggr;
H
Hongze Cheng 已提交
1228

S
Shengliang Guan 已提交
1229
  tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
H
Hongze Cheng 已提交
1230
            " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
L
Liu Jicong 已提交
1231
            REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
H
Hongze Cheng 已提交
1232
            pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts);
H
Hongze Cheng 已提交
1233 1234 1235 1236 1237 1238

  return 0;
}

static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
                          bool isSuper) {
1239 1240 1241 1242
  return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
                            isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
                            pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
                            (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
H
Hongze Cheng 已提交
1243 1244
}

H
more  
Hongze Cheng 已提交
1245
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
L
Liu Jicong 已提交
1246
  SDFile   *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
H
more  
Hongze Cheng 已提交
1247
  SBlockIdx blkIdx;
L
Liu Jicong 已提交
1248
  STable   *pTable = TSDB_COMMIT_TABLE(pCommih);
H
Hongze Cheng 已提交
1249

H
more  
Hongze Cheng 已提交
1250 1251 1252 1253
  if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
                             &blkIdx) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
1254

H
more  
Hongze Cheng 已提交
1255 1256 1257
  if (blkIdx.numOfBlocks == 0) {
    return 0;
  }
H
Hongze Cheng 已提交
1258

H
more  
Hongze Cheng 已提交
1259 1260 1261 1262
  if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
Hongze Cheng 已提交
1263

H
more  
Hongze Cheng 已提交
1264 1265
  return 0;
}
H
Hongze Cheng 已提交
1266

H
more  
Hongze Cheng 已提交
1267
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
L
Liu Jicong 已提交
1268 1269
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbCfg  *pCfg = REPO_CFG(pRepo);
H
more  
Hongze Cheng 已提交
1270 1271
  SMergeInfo mInfo;
  int32_t    defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
L
Liu Jicong 已提交
1272
  SDFile    *pDFile;
H
more  
Hongze Cheng 已提交
1273 1274
  bool       isLast;
  SBlock     block;
H
more  
Hongze Cheng 已提交
1275

H
more  
Hongze Cheng 已提交
1276
  while (true) {
1277 1278
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, defaultRows,
                          pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo);
H
more  
Hongze Cheng 已提交
1279

H
more  
Hongze Cheng 已提交
1280
    if (pCommith->pDataCols->numOfRows <= 0) break;
H
more  
Hongze Cheng 已提交
1281

H
refact  
Hongze Cheng 已提交
1282
    if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) {
H
more  
Hongze Cheng 已提交
1283 1284 1285 1286 1287 1288
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
      isLast = false;
    } else {
      pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
      isLast = true;
    }
H
Hongze Cheng 已提交
1289

H
more  
Hongze Cheng 已提交
1290
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
H
Hongze Cheng 已提交
1291

H
more  
Hongze Cheng 已提交
1292 1293 1294 1295
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
1296

H
more  
Hongze Cheng 已提交
1297 1298
  return 0;
}
H
Hongze Cheng 已提交
1299

H
more  
Hongze Cheng 已提交
1300
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
L
Liu Jicong 已提交
1301 1302
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbCfg  *pCfg = REPO_CFG(pRepo);
H
more  
Hongze Cheng 已提交
1303
  int        nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
L
Liu Jicong 已提交
1304
  SBlock    *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
H
more  
Hongze Cheng 已提交
1305
  TSKEY      keyLimit;
1306
  int16_t    colId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
more  
Hongze Cheng 已提交
1307 1308 1309
  SMergeInfo mInfo;
  SBlock     subBlocks[TSDB_MAX_SUBBLOCKS];
  SBlock     block, supBlock;
L
Liu Jicong 已提交
1310
  SDFile    *pDFile;
H
more  
Hongze Cheng 已提交
1311 1312 1313 1314

  if (bidx == nBlocks - 1) {
    keyLimit = pCommith->maxKey;
  } else {
H
Hongze Cheng 已提交
1315
    keyLimit = pBlock[1].minKey.ts - 1;
H
more  
Hongze Cheng 已提交
1316
  }
H
Hongze Cheng 已提交
1317

H
more  
Hongze Cheng 已提交
1318
  SSkipListIterator titer = *(pIter->pIter);
C
Cary Xu 已提交
1319
  if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1;
H
Hongze Cheng 已提交
1320

1321 1322 1323
  tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL,
                        pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
                        &mInfo);
H
Hongze Cheng 已提交
1324

H
more  
Hongze Cheng 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
  if (mInfo.nOperations == 0) {
    // no new data to insert (all updates denied)
    if (tsdbMoveBlock(pCommith, bidx) < 0) {
      return -1;
    }
    *(pIter->pIter) = titer;
  } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
    // Ignore the block
    ASSERT(0);
    *(pIter->pIter) = titer;
  } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
    // Add a sub-block
1337 1338 1339
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, INT32_MAX,
                          pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData,
                          pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
H
more  
Hongze Cheng 已提交
1340 1341 1342 1343 1344
    if (pBlock->last) {
      pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
    } else {
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
    }
H
Hongze Cheng 已提交
1345

H
more  
Hongze Cheng 已提交
1346
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
H
Hongze Cheng 已提交
1347

H
more  
Hongze Cheng 已提交
1348 1349 1350 1351 1352 1353 1354 1355 1356
    if (pBlock->numOfSubBlocks == 1) {
      subBlocks[0] = *pBlock;
      subBlocks[0].numOfSubBlocks = 0;
    } else {
      memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
             sizeof(SBlock) * pBlock->numOfSubBlocks);
    }
    subBlocks[pBlock->numOfSubBlocks] = block;
    supBlock = *pBlock;
H
Hongze Cheng 已提交
1357 1358
    supBlock.minKey.ts = mInfo.keyFirst;
    supBlock.maxKey.ts = mInfo.keyLast;
H
more  
Hongze Cheng 已提交
1359 1360 1361 1362 1363 1364 1365 1366 1367
    supBlock.numOfSubBlocks++;
    supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
    supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);

    if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
  } else {
    if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
    if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
  }
H
Hongze Cheng 已提交
1368

H
more  
Hongze Cheng 已提交
1369 1370
  return 0;
}
H
Hongze Cheng 已提交
1371

1372 1373 1374 1375 1376 1377 1378 1379
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
  SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  if (pBlock->last) {
    return pCommith->isLFileSame;
  }
  return pCommith->isDFileSame;
}

H
Hongze Cheng 已提交
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
  SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  SDFile *pDFile;
  SBlock  block;
  bool    isSameFile;

  ASSERT(pBlock->numOfSubBlocks > 0);

  if (pBlock->last) {
    pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
    isSameFile = pCommith->isLFileSame;
  } else {
    pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
    isSameFile = pCommith->isDFileSame;
  }

  if (isSameFile) {
    if (pBlock->numOfSubBlocks == 1) {
      if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) {
        return -1;
      }
    } else {
      block = *pBlock;
      block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk);

      if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
                             pBlock->numOfSubBlocks) < 0) {
        return -1;
      }
    }
  } else {
    if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
  }

  return 0;
}

static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
  if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

H
more  
Hongze Cheng 已提交
1433 1434
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
                              bool isLastOneBlock) {
L
Liu Jicong 已提交
1435
  STsdb    *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
1436 1437
  STsdbCfg *pCfg = REPO_CFG(pRepo);
  SBlock    block;
L
Liu Jicong 已提交
1438
  SDFile   *pDFile;
H
more  
Hongze Cheng 已提交
1439 1440
  bool      isLast;
  int32_t   defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
H
more  
Hongze Cheng 已提交
1441

H
more  
Hongze Cheng 已提交
1442 1443
  int biter = 0;
  while (true) {
H
Hongze Cheng 已提交
1444 1445
    tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols,
                              keyLimit, defaultRows, pCfg->update);
H
Hongze Cheng 已提交
1446

H
more  
Hongze Cheng 已提交
1447
    if (pCommith->pDataCols->numOfRows == 0) break;
H
Hongze Cheng 已提交
1448

H
more  
Hongze Cheng 已提交
1449
    if (isLastOneBlock) {
H
refact  
Hongze Cheng 已提交
1450
      if (pCommith->pDataCols->numOfRows < pCfg->minRows) {
H
more  
Hongze Cheng 已提交
1451 1452 1453 1454 1455 1456 1457 1458 1459 1460
        pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
        isLast = true;
      } else {
        pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
        isLast = false;
      }
    } else {
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
      isLast = false;
    }
H
Hongze Cheng 已提交
1461

H
more  
Hongze Cheng 已提交
1462 1463 1464
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
  }
H
more  
Hongze Cheng 已提交
1465

H
more  
Hongze Cheng 已提交
1466 1467
  return 0;
}
H
more  
Hongze Cheng 已提交
1468

H
Hongze Cheng 已提交
1469 1470
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
                                      SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) {
H
more  
Hongze Cheng 已提交
1471 1472
  TSKEY     key1 = INT64_MAX;
  TSKEY     key2 = INT64_MAX;
1473
  TSKEY     lastKey = TSKEY_INITIAL_VAL;
H
more  
Hongze Cheng 已提交
1474
  STSchema *pSchema = NULL;
H
Hongze Cheng 已提交
1475

H
more  
Hongze Cheng 已提交
1476 1477
  ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
  tdResetDataCols(pTarget);
H
more  
Hongze Cheng 已提交
1478

C
Cary Xu 已提交
1479
  pTarget->bitmapMode = pDataCols->bitmapMode;
1480 1481
  // TODO: filter Multi-Version
  // TODO: support delete function
H
more  
Hongze Cheng 已提交
1482 1483
  while (true) {
    key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
C
Cary Xu 已提交
1484 1485
    STSRow *row = tsdbNextIterRow(pCommitIter->pIter);
    if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
more  
Hongze Cheng 已提交
1486 1487
      key2 = INT64_MAX;
    } else {
C
Cary Xu 已提交
1488
      key2 = TD_ROW_KEY(row);
H
more  
Hongze Cheng 已提交
1489
    }
H
more  
Hongze Cheng 已提交
1490

H
more  
Hongze Cheng 已提交
1491
    if (key1 == INT64_MAX && key2 == INT64_MAX) break;
H
more  
Hongze Cheng 已提交
1492

H
more  
Hongze Cheng 已提交
1493
    if (key1 < key2) {
1494 1495 1496
      if (lastKey != TSKEY_INITIAL_VAL) {
        ++pTarget->numOfRows;
      }
C
Cary Xu 已提交
1497
      for (int i = 0; i < pDataCols->numOfCols; ++i) {
H
more  
Hongze Cheng 已提交
1498
        // TODO: dataColAppendVal may fail
C
Cary Xu 已提交
1499
        SCellVal sVal = {0};
C
Cary Xu 已提交
1500
        if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
C
Cary Xu 已提交
1501 1502
          TASSERT(0);
        }
H
refact  
Hongze Cheng 已提交
1503
        tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
C
Cary Xu 已提交
1504
                             pTarget->bitmapMode, false);
H
more  
Hongze Cheng 已提交
1505
      }
H
more  
Hongze Cheng 已提交
1506

1507
      lastKey = key1;
C
Cary Xu 已提交
1508
      ++(*iter);
H
more  
Hongze Cheng 已提交
1509
    } else if (key1 > key2) {
C
Cary Xu 已提交
1510
      if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
1511
        pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
H
more  
Hongze Cheng 已提交
1512 1513
        ASSERT(pSchema != NULL);
      }
H
Hongze Cheng 已提交
1514

1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
      if (key2 == lastKey) {
        if (TD_SUPPORT_UPDATE(update)) {
          tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
        }
      } else {
        if (lastKey != TSKEY_INITIAL_VAL) {
          ++pTarget->numOfRows;
        }
        tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
        lastKey = key2;
      }
H
more  
Hongze Cheng 已提交
1526 1527 1528

      tSkipListIterNext(pCommitIter->pIter);
    } else {
H
Hongze Cheng 已提交
1529
      if (lastKey != key1) {
1530 1531 1532
        if (lastKey != TSKEY_INITIAL_VAL) {
          ++pTarget->numOfRows;
        }
1533 1534 1535
        lastKey = key1;
      }

C
Cary Xu 已提交
1536 1537 1538
      // copy disk data
      for (int i = 0; i < pDataCols->numOfCols; ++i) {
        SCellVal sVal = {0};
1539
        // no duplicated TS keys in pDataCols from file
C
Cary Xu 已提交
1540 1541 1542 1543 1544
        if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
          TASSERT(0);
        }
        // TODO: tdAppendValToDataCol may fail
        tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
C
Cary Xu 已提交
1545
                             pTarget->bitmapMode, false);
C
Cary Xu 已提交
1546 1547 1548 1549 1550
      }

      if (TD_SUPPORT_UPDATE(update)) {
        // copy mem data(Multi-Version)
        if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
1551
          pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
C
Cary Xu 已提交
1552 1553 1554 1555
          ASSERT(pSchema != NULL);
        }

        // TODO: merge with Multi-Version
1556
        tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
C
Cary Xu 已提交
1557
      }
1558 1559
      ++(*iter);
      tSkipListIterNext(pCommitIter->pIter);
H
more  
Hongze Cheng 已提交
1560 1561
    }

1562 1563 1564 1565 1566
    if (pTarget->numOfRows >= (maxRows - 1)) break;
  }

  if (lastKey != TSKEY_INITIAL_VAL) {
    ++pTarget->numOfRows;
H
more  
Hongze Cheng 已提交
1567 1568
  }
}
H
Hongze Cheng 已提交
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586

static void tsdbResetCommitTable(SCommitH *pCommith) {
  taosArrayClear(pCommith->aSubBlk);
  taosArrayClear(pCommith->aSupBlk);
  pCommith->pTable = NULL;
}

static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
  if (pCommith->isRFileSet) {
    tsdbCloseAndUnsetFSet(&(pCommith->readh));
  }

  if (!hasError) {
    TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith));
  }
  tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}

H
more  
Hongze Cheng 已提交
1587
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
L
Liu Jicong 已提交
1588
  STsdb    *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
1589 1590
  STsdbCfg *pCfg = REPO_CFG(pRepo);
  int       mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
H
Hongze Cheng 已提交
1591

H
more  
Hongze Cheng 已提交
1592
  ASSERT(mergeRows > 0);
H
Hongze Cheng 已提交
1593

H
refact  
Hongze Cheng 已提交
1594
  if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) {
H
more  
Hongze Cheng 已提交
1595
    if (pBlock->last) {
H
refact  
Hongze Cheng 已提交
1596
      if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true;
H
more  
Hongze Cheng 已提交
1597
    } else {
H
refact  
Hongze Cheng 已提交
1598
      if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true;
H
more  
Hongze Cheng 已提交
1599 1600
    }
  }
H
Hongze Cheng 已提交
1601

H
more  
Hongze Cheng 已提交
1602
  return false;
H
Hongze Cheng 已提交
1603
}