tsdbCommit.c 58.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
more  
Hongze Cheng 已提交
18 19
#define TSDB_MAX_SUBBLOCKS 8

H
more  
Hongze Cheng 已提交
20
typedef struct {
H
Hongze Cheng 已提交
21 22
  STable      *pTable;
  STbDataIter *pIter;
H
more  
Hongze Cheng 已提交
23 24 25 26 27 28 29 30
} SCommitIter;

typedef struct {
  SRtn         rtn;     // retention snapshot
  SFSIter      fsIter;  // tsdb file iterator
  int          niters;  // memory iterators
  SCommitIter *iters;
  bool         isRFileSet;  // read and commit FSET
H
Hongze Cheng 已提交
31 32
  int32_t      fid;
  SDFileSet   *pSet;
H
more  
Hongze Cheng 已提交
33 34 35 36 37 38
  SReadH       readh;
  SDFileSet    wSet;
  bool         isDFileSame;
  bool         isLFileSame;
  TSKEY        minKey;
  TSKEY        maxKey;
L
Liu Jicong 已提交
39 40 41 42 43
  SArray      *aBlkIdx;  // SBlockIdx array
  STable      *pTable;
  SArray      *aSupBlk;  // Table super-block array
  SArray      *aSubBlk;  // table sub-block array
  SDataCols   *pDataCols;
H
more  
Hongze Cheng 已提交
44 45
} SCommitH;

H
more  
Hongze Cheng 已提交
46 47
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)

C
Cary Xu 已提交
48 49 50 51 52 53 54 55 56 57 58 59
#define TSDB_COMMIT_REPO(ch)         TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch)      REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch)   (&((ch)->wSet))
#define TSDB_COMMIT_TABLE(ch)        ((ch)->pTable)
#define TSDB_COMMIT_HEAD_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch)    TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch)          TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch)     TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch)        TSDB_READ_EXBUF(&((ch)->readh))
H
Hongze Cheng 已提交
60
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
C
Cary Xu 已提交
61
#define TSDB_COMMIT_TXN_VERSION(ch)  FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
H
refact  
Hongze Cheng 已提交
62

H
refact  
Hongze Cheng 已提交
63 64 65
static int32_t tsdbCommitData(SCommitH *pCommith);
static int32_t tsdbCommitDel(SCommitH *pCommith);
static int32_t tsdbCommitCache(SCommitH *pCommith);
H
more  
Hongze Cheng 已提交
66 67 68
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle);
static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno);

H
Hongze Cheng 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
static int     tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void    tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int     tsdbNextCommitFid(SCommitH *pCommith);
static void    tsdbDestroyCommitH(SCommitH *pCommith);
static int32_t tsdbCreateCommitIters(SCommitH *pCommith);
static void    tsdbDestroyCommitIters(SCommitH *pCommith);
static int     tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int     tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int     tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool    tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int     tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int     tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int     tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int     tsdbWriteBlockInfo(SCommitH *pCommih);
static int     tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int     tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int     tsdbMoveBlock(SCommitH *pCommith, int bidx);
H
more  
Hongze Cheng 已提交
86 87 88 89 90
static int  tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int  tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
                               bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
H
more  
Hongze Cheng 已提交
91
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
H
Hongze Cheng 已提交
92 93
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
                                      SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
H
more  
Hongze Cheng 已提交
94
static int  tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
H
Hongze Cheng 已提交
95
static int  tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
H
Hongze Cheng 已提交
96 97 98
static int  tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
                                  SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
                                  SMergeInfo *pMergeInfo);
H
refact  
Hongze Cheng 已提交
99

H
refact  
Hongze Cheng 已提交
100
int32_t tsdbBegin(STsdb *pTsdb) {
H
Hongze Cheng 已提交
101 102
  if (!pTsdb) return 0;

H
refact  
Hongze Cheng 已提交
103
  SMemTable *pMem;
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111

  if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
    return -1;
  }

  return 0;
}

H
more  
Hongze Cheng 已提交
112
int32_t tsdbCommit(STsdb *pTsdb) {
113 114
  if (!pTsdb) return 0;
  
H
more  
Hongze Cheng 已提交
115
  int32_t    code = 0;
H
Hongze Cheng 已提交
116 117 118
  SCommitH   commith = {0};
  SDFileSet *pSet = NULL;
  int        fid;
H
refact  
Hongze Cheng 已提交
119

H
more  
Hongze Cheng 已提交
120 121 122
  ASSERT(pTsdb->imem == NULL && pTsdb->mem);
  pTsdb->imem = pTsdb->mem;
  pTsdb->mem = NULL;
H
refact  
Hongze Cheng 已提交
123

H
more  
Hongze Cheng 已提交
124
  // start commit
H
more  
Hongze Cheng 已提交
125 126 127
  code = tsdbStartCommit(pTsdb, &commith);
  if (code) {
    goto _err;
H
refact  
Hongze Cheng 已提交
128 129
  }

H
refact  
Hongze Cheng 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
  // commit impl
  code = tsdbCommitData(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitDel(&commith);
  if (code) {
    goto _err;
  }

  code = tsdbCommitCache(&commith);
  if (code) {
    goto _err;
  }

  // end commit
H
more  
Hongze Cheng 已提交
147 148 149 150
  code = tsdbEndCommit(&commith, 0);
  if (code) {
    goto _err;
  }
H
refact  
Hongze Cheng 已提交
151 152 153 154

  return code;

_err:
C
Cary Xu 已提交
155
  tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
H
refact  
Hongze Cheng 已提交
156 157 158 159 160
  return code;
}

static int32_t tsdbCommitData(SCommitH *pCommith) {
  int32_t    fid;
H
more  
Hongze Cheng 已提交
161
  SDFileSet *pSet = NULL;
H
refact  
Hongze Cheng 已提交
162 163 164
  int32_t    code = 0;
  STsdb     *pTsdb = TSDB_COMMIT_REPO(pCommith);

H
refact  
Hongze Cheng 已提交
165
  // Skip expired memory data and expired FSET
H
refact  
Hongze Cheng 已提交
166 167 168
  tsdbSeekCommitIter(pCommith, pCommith->rtn.minKey);
  while ((pSet = tsdbFSIterNext(&(pCommith->fsIter)))) {
    if (pSet->fid < pCommith->rtn.minFid) {
H
more  
Hongze Cheng 已提交
169
      tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid,
H
refact  
Hongze Cheng 已提交
170 171 172 173 174 175
               TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
    } else {
      break;
    }
  }

H
more  
Hongze Cheng 已提交
176
  // commit
H
refact  
Hongze Cheng 已提交
177
  fid = tsdbNextCommitFid(pCommith);
H
refact  
Hongze Cheng 已提交
178 179 180 181 182 183 184
  while (true) {
    // Loop over both on disk and memory
    if (pSet == NULL && fid == TSDB_IVLD_FID) break;

    if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
      // Only has existing FSET but no memory data to commit in this
      // existing FSET, only check if file in correct retention
H
refact  
Hongze Cheng 已提交
185 186
      if (tsdbApplyRtnOnFSet(TSDB_COMMIT_REPO(pCommith), pSet, &(pCommith->rtn)) < 0) {
        tsdbDestroyCommitH(pCommith);
H
refact  
Hongze Cheng 已提交
187 188 189
        return -1;
      }

H
refact  
Hongze Cheng 已提交
190
      pSet = tsdbFSIterNext(&(pCommith->fsIter));
H
refact  
Hongze Cheng 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203
    } else {
      // Has memory data to commit
      SDFileSet *pCSet;
      int        cfid;

      if (pSet == NULL || pSet->fid > fid) {
        // Commit to a new FSET with fid: fid
        pCSet = NULL;
        cfid = fid;
      } else {
        // Commit to an existing FSET
        pCSet = pSet;
        cfid = pSet->fid;
H
refact  
Hongze Cheng 已提交
204
        pSet = tsdbFSIterNext(&(pCommith->fsIter));
H
refact  
Hongze Cheng 已提交
205 206
      }

H
refact  
Hongze Cheng 已提交
207 208
      if (tsdbCommitToFile(pCommith, pCSet, cfid) < 0) {
        tsdbDestroyCommitH(pCommith);
H
refact  
Hongze Cheng 已提交
209 210 211
        return -1;
      }

H
refact  
Hongze Cheng 已提交
212
      fid = tsdbNextCommitFid(pCommith);
H
refact  
Hongze Cheng 已提交
213 214
    }
  }
H
more  
Hongze Cheng 已提交
215

H
refact  
Hongze Cheng 已提交
216 217 218 219 220 221 222 223
  return code;
}

static int32_t tsdbCommitDel(SCommitH *pCommith) {
  int32_t code = 0;
  // TODO
  return code;
}
H
more  
Hongze Cheng 已提交
224

H
refact  
Hongze Cheng 已提交
225 226 227
static int32_t tsdbCommitCache(SCommitH *pCommith) {
  int32_t code = 0;
  // TODO
H
more  
Hongze Cheng 已提交
228
  return code;
H
Hongze Cheng 已提交
229 230
}

H
Hongze Cheng 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
  SDiskID   did;
  SDFileSet nSet = {0};
  STsdbFS  *pfs = REPO_FS(pRepo);
  int       level;

  ASSERT(pSet->fid >= pRtn->minFid);

  level = tsdbGetFidLevel(pSet->fid, pRtn);

  if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
    terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
    return -1;
  }

  if (did.level > TSDB_FSET_LEVEL(pSet)) {
    // Need to move the FSET to higher level
    tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));

    if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
      tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
                TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
      return -1;
    }

    if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
      return -1;
    }

    tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
             TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
  } else {
    // On a correct level
    if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
      return -1;
    }
  }

  return 0;
}

H
Hongze Cheng 已提交
272
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
C
Cary Xu 已提交
273
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
H
Hongze Cheng 已提交
274
  TSKEY         minKey, midKey, maxKey, now;
H
Hongze Cheng 已提交
275 276

  now = taosGetTimestamp(pCfg->precision);
277 278 279
  minKey = now - pCfg->keep2 * tsTickPerMin[pCfg->precision];
  midKey = now - pCfg->keep1 * tsTickPerMin[pCfg->precision];
  maxKey = now - pCfg->keep0 * tsTickPerMin[pCfg->precision];
H
Hongze Cheng 已提交
280 281

  pRtn->minKey = minKey;
H
refact  
Hongze Cheng 已提交
282 283 284
  pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
  pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
  pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
S
Shengliang Guan 已提交
285
  tsdbDebug("vgId:%d, now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
H
Hongze Cheng 已提交
286 287 288
            pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}

H
more  
Hongze Cheng 已提交
289 290
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle) {
  int32_t code = 0;
H
more  
Hongze Cheng 已提交
291

H
more  
Hongze Cheng 已提交
292
  tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb));
H
more  
Hongze Cheng 已提交
293

H
more  
Hongze Cheng 已提交
294 295 296 297 298 299 300
  if (tsdbInitCommitH(pCHandle, pTsdb) < 0) {
    return -1;
  }

  tsdbStartFSTxn(pTsdb, 0, 0);

  return code;
H
more  
Hongze Cheng 已提交
301 302
}

H
more  
Hongze Cheng 已提交
303 304 305 306 307
static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno) {
  int32_t code = 0;
  STsdb  *pTsdb = TSDB_COMMIT_REPO(pCHandle);

  tsdbDestroyCommitH(pCHandle);
H
more  
Hongze Cheng 已提交
308
  tsdbEndFSTxn(pTsdb);
H
more  
Hongze Cheng 已提交
309
  tsdbMemTableDestroy(pTsdb->imem);
H
more  
Hongze Cheng 已提交
310
  pTsdb->imem = NULL;
H
more  
Hongze Cheng 已提交
311

S
Shengliang Guan 已提交
312
  tsdbInfo("vgId:%d, commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
H
more  
Hongze Cheng 已提交
313 314

  return code;
H
more  
Hongze Cheng 已提交
315 316
}

H
refact  
Hongze Cheng 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
  STsdbCfg *pCfg = REPO_CFG(pRepo);

  memset(pCommith, 0, sizeof(*pCommith));
  tsdbGetRtnSnap(pRepo, &(pCommith->rtn));

  TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith));

  // Init read handle
  if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
    return -1;
  }

  // Init file iterator
  tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);

  if (tsdbCreateCommitIters(pCommith) < 0) {
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
  if (pCommith->aBlkIdx == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
  if (pCommith->aSupBlk == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
  if (pCommith->aSubBlk == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
359
  pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows);
H
refact  
Hongze Cheng 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
  if (pCommith->pDataCols == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyCommitH(pCommith);
    return -1;
  }

  return 0;
}

// Skip all keys until key (not included)
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
  for (int i = 0; i < pCommith->niters; i++) {
    SCommitIter *pIter = pCommith->iters + i;
    if (pIter->pTable == NULL || pIter->pIter == NULL) continue;

375 376
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0,
                          true, NULL);
H
refact  
Hongze Cheng 已提交
377 378 379
  }
}

H
more  
Hongze Cheng 已提交
380
static int tsdbNextCommitFid(SCommitH *pCommith) {
C
Cary Xu 已提交
381 382 383
  STsdb        *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
  int           fid = TSDB_IVLD_FID;
H
more  
Hongze Cheng 已提交
384 385 386

  for (int i = 0; i < pCommith->niters; i++) {
    SCommitIter *pIter = pCommith->iters + i;
C
Cary Xu 已提交
387
    // if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
H
more  
Hongze Cheng 已提交
388 389 390 391 392

    TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
    if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
      continue;
    } else {
H
refact  
Hongze Cheng 已提交
393
      int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision));
H
more  
Hongze Cheng 已提交
394 395 396 397 398 399 400 401
      if (fid == TSDB_IVLD_FID || fid > tfid) {
        fid = tfid;
      }
    }
  }

  return fid;
}
H
refact  
Hongze Cheng 已提交
402 403 404 405 406 407 408 409 410 411 412

static void tsdbDestroyCommitH(SCommitH *pCommith) {
  pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
  pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
  pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
  pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx);
  tsdbDestroyCommitIters(pCommith);
  tsdbDestroyReadH(&(pCommith->readh));
  tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}

H
Hongze Cheng 已提交
413 414 415
static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_t fid) {
  int32_t       code = 0;
  STsdb        *pRepo = TSDB_COMMIT_REPO(pCHandle);
C
Cary Xu 已提交
416
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
H
Hongze Cheng 已提交
417

H
more  
Hongze Cheng 已提交
418
  ASSERT(pSet == NULL || pSet->fid == fid);
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
  pCHandle->fid = fid;
  pCHandle->pSet = pSet;
  pCHandle->isRFileSet = false;
  pCHandle->isDFileSame = false;
  pCHandle->isLFileSame = false;
  taosArrayClear(pCHandle->aBlkIdx);

  tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCHandle->minKey), &(pCHandle->maxKey));

  code = tsdbSetAndOpenCommitFile(pCHandle, pSet, fid);

  return code;
}
static int32_t tsdbCommitToFileImpl(SCommitH *pCHandle) {
  int32_t code = 0;
  // TODO
  return code;
}
static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) {
  int32_t code = 0;
  STsdb  *pRepo = TSDB_COMMIT_REPO(pCommith);
H
Hongze Cheng 已提交
441

H
Hongze Cheng 已提交
442 443 444 445 446 447 448
  if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
      0) {
    tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid,
              tstrerror(terrno));
    tsdbCloseCommitFile(pCommith, true);
    // revert the file change
    tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet);
H
more  
Hongze Cheng 已提交
449 450
    return -1;
  }
H
Hongze Cheng 已提交
451

H
Hongze Cheng 已提交
452 453 454 455 456 457 458
  if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
    tsdbError("vgId:%d, failed to update FSET %d header since %s", REPO_ID(pRepo), pCommith->fid, tstrerror(terrno));
    tsdbCloseCommitFile(pCommith, true);
    // revert the file change
    tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet);
    return -1;
  }
H
Hongze Cheng 已提交
459

H
Hongze Cheng 已提交
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
  // Close commit file
  tsdbCloseCommitFile(pCommith, false);

  if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
    return -1;
  }

  return code;
}
static int32_t tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
  int32_t       code = 0;
  STsdb        *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);

  // commit to file start
  code = tsdbCommitToFileStart(pCommith, pSet, fid);
  if (code) {
    goto _err;
H
more  
Hongze Cheng 已提交
478
  }
H
Hongze Cheng 已提交
479

C
Cary Xu 已提交
480
  // Loop to commit each table data in mem and file
C
Cary Xu 已提交
481 482
  int mIter = 0, fIter = 0;
  int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx);
C
Cary Xu 已提交
483 484 485 486 487 488 489 490 491 492 493 494 495 496

  while (true) {
    SBlockIdx   *pIdx = NULL;
    SCommitIter *pIter = NULL;
    if (mIter < pCommith->niters) {
      pIter = pCommith->iters + mIter;
      if (fIter < nBlkIdx) {
        pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
      }
    } else if (fIter < nBlkIdx) {
      pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter);
    } else {
      break;
    }
C
Cary Xu 已提交
497

H
Hongze Cheng 已提交
498
    if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) {
C
Cary Xu 已提交
499 500 501 502 503 504 505 506 507 508 509
      if (tsdbCommitToTable(pCommith, mIter) < 0) {
        tsdbCloseCommitFile(pCommith, true);
        // revert the file change
        tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
        return -1;
      }

      if (pIdx && (pIter->pTable->uid == pIdx->uid)) {
        ++fIter;
      }
      ++mIter;
C
Cary Xu 已提交
510 511
    } else if (pIter && !pIter->pTable) {
      // When table already dropped during commit, pIter is not NULL but pIter->pTable is NULL.
512
      ++mIter;  // skip the table and do nothing
C
Cary Xu 已提交
513 514 515 516 517 518 519 520 521 522
    } else if (pIdx) {
      if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) {
        tsdbCloseCommitFile(pCommith, true);
        // revert the file change
        tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
        return -1;
      }
      ++fIter;
    }
  }
H
Hongze Cheng 已提交
523

H
Hongze Cheng 已提交
524 525 526 527
  // commit to file end
  code = tsdbCommitToFileEnd(pCommith);
  if (code) {
    goto _err;
H
more  
Hongze Cheng 已提交
528
  }
H
Hongze Cheng 已提交
529

H
Hongze Cheng 已提交
530
  return code;
H
Hongze Cheng 已提交
531

H
Hongze Cheng 已提交
532 533
_err:
  return code;
H
more  
Hongze Cheng 已提交
534
}
H
refact  
Hongze Cheng 已提交
535

H
Hongze Cheng 已提交
536 537 538 539
static int32_t tsdbCreateCommitIters(SCommitH *pCommith) {
  int32_t      code = 0;
  STsdb       *pRepo = TSDB_COMMIT_REPO(pCommith);
  SMemTable   *pMem = pRepo->imem;
H
Hongze Cheng 已提交
540
  STbData     *pTbData;
H
Hongze Cheng 已提交
541
  SCommitIter *pCommitIter;
H
Hongze Cheng 已提交
542
  STSchema    *pTSchema = NULL;
H
Hongze Cheng 已提交
543 544 545 546 547 548 549 550 551

  pCommith->niters = taosArrayGetSize(pMem->aTbData);
  pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter));
  if (pCommith->iters == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  for (int32_t iIter = 0; iIter < pCommith->niters; iIter++) {
H
Hongze Cheng 已提交
552 553
    pTbData = (STbData *)taosArrayGetP(pMem->aTbData, iIter);
    pCommitIter = &pCommith->iters[iIter];
H
Hongze Cheng 已提交
554

555
    pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1);
C
Cary Xu 已提交
556
    if (pTSchema) {
H
Hongze Cheng 已提交
557
      tsdbTbDataIterCreate(pTbData, NULL, 0, &pCommitIter->pIter);
C
Cary Xu 已提交
558

C
Cary Xu 已提交
559 560
      pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
      pCommitIter->pTable->uid = pTbData->uid;
H
Hongze Cheng 已提交
561
      pCommitIter->pTable->suid = pTbData->suid;
562 563
      pCommitIter->pTable->pSchema = pTSchema;
      pCommitIter->pTable->pCacheSchema = NULL;
C
Cary Xu 已提交
564
    }
H
refact  
Hongze Cheng 已提交
565
  }
H
Hongze Cheng 已提交
566 567 568 569 570

  return code;

_err:
  return code;
H
refact  
Hongze Cheng 已提交
571 572
}

H
Hongze Cheng 已提交
573 574
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
  if (pCommith->iters == NULL) return;
H
refact  
Hongze Cheng 已提交
575

H
Hongze Cheng 已提交
576
  for (int i = 1; i < pCommith->niters; i++) {
H
Hongze Cheng 已提交
577
    tsdbTbDataIterDestroy(pCommith->iters[i].pIter);
578 579
    if (pCommith->iters[i].pTable) {
      tdFreeSchema(pCommith->iters[i].pTable->pSchema);
580
      tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema);
581 582
      taosMemoryFreeClear(pCommith->iters[i].pTable);
    }
H
refact  
Hongze Cheng 已提交
583 584
  }

wafwerar's avatar
wafwerar 已提交
585
  taosMemoryFree(pCommith->iters);
H
Hongze Cheng 已提交
586 587
  pCommith->iters = NULL;
  pCommith->niters = 0;
H
refact  
Hongze Cheng 已提交
588 589
}

H
more  
Hongze Cheng 已提交
590 591
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
  SDiskID    did;
L
Liu Jicong 已提交
592
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
593
  SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
H
Hongze Cheng 已提交
594

H
Hongze Cheng 已提交
595
  if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
H
more  
Hongze Cheng 已提交
596 597 598
    terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
    return -1;
  }
H
Hongze Cheng 已提交
599

H
more  
Hongze Cheng 已提交
600 601 602 603 604
  // Open read FSET
  if (pSet) {
    if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
      return -1;
    }
H
Hongze Cheng 已提交
605

H
more  
Hongze Cheng 已提交
606
    pCommith->isRFileSet = true;
H
Hongze Cheng 已提交
607

H
more  
Hongze Cheng 已提交
608 609 610 611
    if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
      tsdbCloseAndUnsetFSet(&(pCommith->readh));
      return -1;
    }
H
Hongze Cheng 已提交
612

H
Hongze Cheng 已提交
613 614
    tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo),
              TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
H
more  
Hongze Cheng 已提交
615 616 617
  } else {
    pCommith->isRFileSet = false;
  }
H
Hongze Cheng 已提交
618

H
more  
Hongze Cheng 已提交
619 620 621
  // Set and open commit FSET
  if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
    // Create a new FSET to write data
S
Shengliang Guan 已提交
622
    tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
H
Hongze Cheng 已提交
623

S
Shengliang Guan 已提交
624
    if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
S
Shengliang Guan 已提交
625
      tsdbError("vgId:%d, failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
H
more  
Hongze Cheng 已提交
626 627 628 629 630 631
                TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
      }
      return -1;
    }
H
Hongze Cheng 已提交
632

H
more  
Hongze Cheng 已提交
633 634
    pCommith->isDFileSame = false;
    pCommith->isLFileSame = false;
H
Hongze Cheng 已提交
635

S
Shengliang Guan 已提交
636
    tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
H
more  
Hongze Cheng 已提交
637 638 639 640
              TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
  } else {
    did.level = TSDB_FSET_LEVEL(pSet);
    did.id = TSDB_FSET_ID(pSet);
H
Hongze Cheng 已提交
641

H
more  
Hongze Cheng 已提交
642 643
    pCommith->wSet.fid = fid;
    pCommith->wSet.state = 0;
H
Hongze Cheng 已提交
644

H
more  
Hongze Cheng 已提交
645 646
    // TSDB_FILE_HEAD
    SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
S
Shengliang Guan 已提交
647
    tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
648
    if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) {
S
Shengliang Guan 已提交
649
      tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
H
more  
Hongze Cheng 已提交
650
                tstrerror(terrno));
H
Hongze Cheng 已提交
651

H
more  
Hongze Cheng 已提交
652 653 654
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
        return -1;
H
Hongze Cheng 已提交
655 656 657
      }
    }

H
more  
Hongze Cheng 已提交
658 659 660 661
    // TSDB_FILE_DATA
    SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
    SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
    tsdbInitDFileEx(pWDataf, pRDataf);
662 663
    // if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
    if (tsdbOpenDFile(pWDataf, TD_FILE_WRITE) < 0) {
S
Shengliang Guan 已提交
664
      tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
H
more  
Hongze Cheng 已提交
665
                tstrerror(terrno));
H
Hongze Cheng 已提交
666

H
more  
Hongze Cheng 已提交
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
      tsdbCloseDFileSet(pWSet);
      tsdbRemoveDFile(pWHeadf);
      if (pCommith->isRFileSet) {
        tsdbCloseAndUnsetFSet(&(pCommith->readh));
        return -1;
      }
    }
    pCommith->isDFileSame = true;

    // TSDB_FILE_LAST
    SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
    SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
    if (pRLastf->info.size < 32 * 1024) {
      tsdbInitDFileEx(pWLastf, pRLastf);
      pCommith->isLFileSame = true;

683 684
      // if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
      if (tsdbOpenDFile(pWLastf, TD_FILE_WRITE) < 0) {
S
Shengliang Guan 已提交
685
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
H
more  
Hongze Cheng 已提交
686
                  tstrerror(terrno));
H
Hongze Cheng 已提交
687

H
more  
Hongze Cheng 已提交
688 689 690 691 692 693 694 695
        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
S
Shengliang Guan 已提交
696
      tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
H
more  
Hongze Cheng 已提交
697
      pCommith->isLFileSame = false;
H
Hongze Cheng 已提交
698

699
      if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) {
S
Shengliang Guan 已提交
700
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
H
more  
Hongze Cheng 已提交
701
                  tstrerror(terrno));
H
Hongze Cheng 已提交
702

H
more  
Hongze Cheng 已提交
703 704 705 706 707 708 709 710
        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }
711 712 713 714 715

    // TSDB_FILE_SMAD
    SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
    SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);

716
    if (!taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmadF))) {
S
Shengliang Guan 已提交
717
      tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
718 719 720
      tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);

      if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) {
S
Shengliang Guan 已提交
721
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
722 723 724 725 726 727 728 729 730 731 732 733
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
      tsdbInitDFileEx(pWSmadF, pRSmadF);
      if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
S
Shengliang Guan 已提交
734
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }

    // TSDB_FILE_SMAL
    SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
    SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);

750
    if ((pCommith->isLFileSame) && taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmalF))) {
751 752
      tsdbInitDFileEx(pWSmalF, pRSmalF);
      if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
S
Shengliang Guan 已提交
753
        tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
754 755 756 757 758 759 760 761 762 763
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    } else {
S
Shengliang Guan 已提交
764
      tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
765 766 767
      tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);

      if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) {
S
Shengliang Guan 已提交
768
        tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
769 770 771 772 773 774 775 776 777 778
                  tstrerror(terrno));

        tsdbCloseDFileSet(pWSet);
        (void)tsdbRemoveDFile(pWHeadf);
        if (pCommith->isRFileSet) {
          tsdbCloseAndUnsetFSet(&(pCommith->readh));
          return -1;
        }
      }
    }
H
more  
Hongze Cheng 已提交
779
  }
H
Hongze Cheng 已提交
780 781 782 783

  return 0;
}

H
more  
Hongze Cheng 已提交
784
// extern int32_t tsTsdbMetaCompactRatio;
H
Hongze Cheng 已提交
785

H
more  
Hongze Cheng 已提交
786 787
static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
                                  SBlockIdx *pIdx) {
H
more  
Hongze Cheng 已提交
788 789 790 791 792
  size_t      nSupBlocks;
  size_t      nSubBlocks;
  uint32_t    tlen;
  SBlockInfo *pBlkInfo;
  int64_t     offset;
L
Liu Jicong 已提交
793
  SBlock     *pBlock;
H
Hongze Cheng 已提交
794

H
more  
Hongze Cheng 已提交
795
  memset(pIdx, 0, sizeof(*pIdx));
H
Hongze Cheng 已提交
796

H
more  
Hongze Cheng 已提交
797 798
  nSupBlocks = taosArrayGetSize(pSupA);
  nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
H
Hongze Cheng 已提交
799

H
more  
Hongze Cheng 已提交
800 801 802 803
  if (nSupBlocks <= 0) {
    // No data (data all deleted)
    return 0;
  }
H
Hongze Cheng 已提交
804

H
more  
Hongze Cheng 已提交
805 806 807
  tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
  if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
  pBlkInfo = *ppBuf;
H
Hongze Cheng 已提交
808

H
more  
Hongze Cheng 已提交
809
  pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
H
Hongze Cheng 已提交
810 811
  pBlkInfo->suid = pTable->suid;
  pBlkInfo->uid = pTable->uid;
H
Hongze Cheng 已提交
812

H
more  
Hongze Cheng 已提交
813 814 815
  memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
  if (nSubBlocks > 0) {
    memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
H
Hongze Cheng 已提交
816

H
more  
Hongze Cheng 已提交
817 818
    for (int i = 0; i < nSupBlocks; i++) {
      pBlock = pBlkInfo->blocks + i;
H
Hongze Cheng 已提交
819

H
more  
Hongze Cheng 已提交
820 821 822 823 824
      if (pBlock->numOfSubBlocks > 1) {
        pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
      }
    }
  }
H
Hongze Cheng 已提交
825

H
more  
Hongze Cheng 已提交
826
  taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
H
Hongze Cheng 已提交
827

H
more  
Hongze Cheng 已提交
828 829 830
  if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
831

H
more  
Hongze Cheng 已提交
832
  tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
H
Hongze Cheng 已提交
833

H
more  
Hongze Cheng 已提交
834 835
  // Set pIdx
  pBlock = taosArrayGetLast(pSupA);
H
Hongze Cheng 已提交
836

H
Hongze Cheng 已提交
837 838
  pIdx->suid = pTable->suid;
  pIdx->uid = pTable->uid;
H
more  
Hongze Cheng 已提交
839
  pIdx->hasLast = pBlock->last ? 1 : 0;
H
Hongze Cheng 已提交
840
  pIdx->maxKey = pBlock->maxKey;
H
more  
Hongze Cheng 已提交
841 842 843
  pIdx->numOfBlocks = (uint32_t)nSupBlocks;
  pIdx->len = tlen;
  pIdx->offset = (uint32_t)offset;
H
Hongze Cheng 已提交
844

H
more  
Hongze Cheng 已提交
845 846
  return 0;
}
H
Hongze Cheng 已提交
847

H
more  
Hongze Cheng 已提交
848
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
H
more  
Hongze Cheng 已提交
849 850 851 852
  SBlockIdx *pBlkIdx;
  size_t     nidx = taosArrayGetSize(pIdxA);
  int        tlen = 0, size;
  int64_t    offset;
H
more  
Hongze Cheng 已提交
853

H
more  
Hongze Cheng 已提交
854 855 856 857 858 859
  if (nidx <= 0) {
    // All data are deleted
    pHeadf->info.offset = 0;
    pHeadf->info.len = 0;
    return 0;
  }
H
Hongze Cheng 已提交
860

H
more  
Hongze Cheng 已提交
861 862
  for (size_t i = 0; i < nidx; i++) {
    pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
H
Hongze Cheng 已提交
863

H
more  
Hongze Cheng 已提交
864 865
    size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
    if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
H
Hongze Cheng 已提交
866

H
more  
Hongze Cheng 已提交
867 868
    void *ptr = POINTER_SHIFT(*ppBuf, tlen);
    tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
H
Hongze Cheng 已提交
869

H
more  
Hongze Cheng 已提交
870 871
    tlen += size;
  }
H
Hongze Cheng 已提交
872

H
more  
Hongze Cheng 已提交
873 874 875
  tlen += sizeof(TSCKSUM);
  if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
  taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
H
Hongze Cheng 已提交
876

H
more  
Hongze Cheng 已提交
877 878 879
  if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
    return -1;
  }
H
Hongze Cheng 已提交
880

H
more  
Hongze Cheng 已提交
881 882 883
  tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
  pHeadf->info.offset = (uint32_t)offset;
  pHeadf->info.len = tlen;
H
Hongze Cheng 已提交
884

H
more  
Hongze Cheng 已提交
885 886
  return 0;
}
H
Hongze Cheng 已提交
887

H
Hongze Cheng 已提交
888
// =================== Commit Time-Series Data
H
Hongze Cheng 已提交
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
  SCommitIter *pIter = pCommith->iters + tid;
  TSKEY        nextKey = tsdbNextIterKey(pIter->pIter);

  tsdbResetCommitTable(pCommith);

  // Set commit table
  if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
    return -1;
  }

  // No disk data and no memory data, just return
  if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
    return 0;
  }

  // Must has disk data or has memory data
  int     nBlocks;
  int     bidx = 0;
  SBlock *pBlock;

  if (pCommith->readh.pBlkIdx) {
    if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
      return -1;
    }

    nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
  } else {
    nBlocks = 0;
  }

  if (bidx < nBlocks) {
    pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  } else {
    pBlock = NULL;
  }

  while (true) {
    if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;

    if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
        (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
      if (tsdbMoveBlock(pCommith, bidx) < 0) {
        return -1;
      }

      bidx++;
      if (bidx < nBlocks) {
        pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
      } else {
        pBlock = NULL;
      }
    } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
H
more  
Hongze Cheng 已提交
942 943 944 945
      // merge pBlock data and memory data
      if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
        return -1;
      }
H
more  
Hongze Cheng 已提交
946

H
more  
Hongze Cheng 已提交
947 948 949 950 951 952 953
      bidx++;
      if (bidx < nBlocks) {
        pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
      } else {
        pBlock = NULL;
      }
      nextKey = tsdbNextIterKey(pIter->pIter);
H
Hongze Cheng 已提交
954
    } else {
H
more  
Hongze Cheng 已提交
955 956 957 958 959 960
      // Only commit memory data
      if (pBlock == NULL) {
        if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
          return -1;
        }
      } else {
H
Hongze Cheng 已提交
961
        if (tsdbCommitMemData(pCommith, pIter, pBlock->minKey.ts - 1, true) < 0) {
H
more  
Hongze Cheng 已提交
962 963 964 965
          return -1;
        }
      }
      nextKey = tsdbNextIterKey(pIter->pIter);
H
Hongze Cheng 已提交
966 967 968
    }
  }

C
Cary Xu 已提交
969
  if (tsdbWriteBlockInfo(pCommith) < 0) {
S
Shengliang Guan 已提交
970
    tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
C
Cary Xu 已提交
971 972 973 974 975 976 977 978
              TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
    return -1;
  }

  return 0;
}

static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
979 980 981 982 983
  SReadH   *pReadh = &pCommith->readh;
  STsdb    *pTsdb = TSDB_READ_REPO(pReadh);
  STSchema *pTSchema = NULL;
  int       nBlocks = pIdx->numOfBlocks;
  int       bidx = 0;
C
Cary Xu 已提交
984 985 986 987 988 989 990 991 992

  tsdbResetCommitTable(pCommith);

  pReadh->pBlkIdx = pIdx;

  if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
    return -1;
  }

H
Hongze Cheng 已提交
993
  STable table = {.suid = pIdx->suid, .uid = pIdx->uid, .pSchema = NULL};
994 995
  pCommith->pTable = &table;

C
Cary Xu 已提交
996
  while (bidx < nBlocks) {
997 998
    if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
      // Set commit table
999
      pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1);  // TODO: schema version
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
      if (!pTSchema) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      table.pSchema = pTSchema;
      if (tsdbSetCommitTable(pCommith, &table) < 0) {
        taosMemoryFreeClear(pTSchema);
        return -1;
      }
    }

C
Cary Xu 已提交
1011
    if (tsdbMoveBlock(pCommith, bidx) < 0) {
S
Shengliang Guan 已提交
1012
      tsdbError("vgId:%d, failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
C
Cary Xu 已提交
1013
                TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
1014
      taosMemoryFreeClear(pTSchema);
C
Cary Xu 已提交
1015 1016
      return -1;
    }
1017

C
Cary Xu 已提交
1018 1019 1020
    ++bidx;
  }

H
more  
Hongze Cheng 已提交
1021
  if (tsdbWriteBlockInfo(pCommith) < 0) {
S
Shengliang Guan 已提交
1022
    tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
H
more  
Hongze Cheng 已提交
1023
              TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
1024
    taosMemoryFreeClear(pTSchema);
H
more  
Hongze Cheng 已提交
1025 1026
    return -1;
  }
H
Hongze Cheng 已提交
1027

1028
  taosMemoryFreeClear(pTSchema);
H
Hongze Cheng 已提交
1029 1030 1031 1032
  return 0;
}

static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
H
Hongze Cheng 已提交
1033
  STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1);
H
Hongze Cheng 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055

  pCommith->pTable = pTable;

  if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (pCommith->isRFileSet) {
    if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) {
      return -1;
    }
  } else {
    pCommith->readh.pBlkIdx = NULL;
  }
  return 0;
}

static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
  TSKEY   key = *(TSKEY *)arg1;
  SBlock *pBlock = (SBlock *)arg2;

H
Hongze Cheng 已提交
1056
  if (key < pBlock->minKey.ts) {
H
Hongze Cheng 已提交
1057
    return -1;
H
Hongze Cheng 已提交
1058
  } else if (key > pBlock->maxKey.ts) {
H
Hongze Cheng 已提交
1059 1060 1061 1062 1063 1064
    return 1;
  } else {
    return 0;
  }
}

C
Cary Xu 已提交
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
/**
 * @brief Write SDataCols to data file.
 *
 * @param pRepo
 * @param pTable
 * @param pDFile
 * @param pDFileAggr
 * @param pDataCols The pDataCols would be generated from mem/imem directly with 2 bits bitmap or from tsdbRead
 * interface with 1 bit bitmap.
 * @param pBlock
 * @param isLast
 * @param isSuper
 * @param ppBuf
 * @param ppCBuf
 * @param ppExBuf
 * @return int
 */
H
more  
Hongze Cheng 已提交
1082 1083
static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
                              SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
C
Cary Xu 已提交
1084 1085
  STsdbCfg     *pCfg = REPO_CFG(pRepo);
  SBlockData   *pBlockData = NULL;
1086
  SAggrBlkData *pAggrBlkData = NULL;
C
Cary Xu 已提交
1087
  STSchema     *pSchema = pTable->pSchema;
1088 1089
  int64_t       offset = 0, offsetAggr = 0;
  int           rowsToWrite = pDataCols->numOfRows;
H
Hongze Cheng 已提交
1090

H
refact  
Hongze Cheng 已提交
1091 1092
  ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows);
  ASSERT((!isLast) || rowsToWrite < pCfg->minRows);
H
Hongze Cheng 已提交
1093 1094

  // Make buffer space
1095
  if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
H
Hongze Cheng 已提交
1096 1097 1098 1099
    return -1;
  }
  pBlockData = (SBlockData *)(*ppBuf);

1100 1101 1102 1103 1104
  if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
    return -1;
  }
  pAggrBlkData = (SAggrBlkData *)(*ppExBuf);

H
Hongze Cheng 已提交
1105
  // Get # of cols not all NULL(not including key column)
C
Cary Xu 已提交
1106 1107
  col_id_t nColsNotAllNull = 0;
  col_id_t nColsOfBlockSma = 0;
1108
  for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) {  // ncol from 1, we skip the timestamp column
C
Cary Xu 已提交
1109 1110 1111
    STColumn    *pColumn = pSchema->columns + ncol;
    SDataCol    *pDataCol = pDataCols->cols + ncol;
    SBlockCol   *pBlockCol = pBlockData->cols + nColsNotAllNull;
C
Cary Xu 已提交
1112
    SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsOfBlockSma;
H
Hongze Cheng 已提交
1113 1114 1115 1116 1117 1118

    if (isAllRowsNull(pDataCol)) {  // all data to commit are NULL, just ignore it
      continue;
    }

    memset(pBlockCol, 0, sizeof(*pBlockCol));
1119
    memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol));
H
Hongze Cheng 已提交
1120 1121 1122

    pBlockCol->colId = pDataCol->colId;
    pBlockCol->type = pDataCol->type;
1123 1124
    pAggrBlkCol->colId = pDataCol->colId;

C
Cary Xu 已提交
1125
    if (isSuper && IS_BSMA_ON(pColumn) && tDataTypes[pDataCol->type].statisFunc) {
1126
#if 0
H
Hongze Cheng 已提交
1127 1128 1129
      (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
                                               &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
                                               &(pBlockCol->numOfNull));
1130
#endif
C
Cary Xu 已提交
1131 1132 1133
      (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, rowsToWrite,
                                               &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum),
                                               &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex),
1134 1135 1136
                                               &(pAggrBlkCol->numOfNull));

      if (pAggrBlkCol->numOfNull == 0) {
C
Cary Xu 已提交
1137
        pBlockCol->blen = 0;
C
Cary Xu 已提交
1138
      } else {
C
Cary Xu 已提交
1139
        pBlockCol->blen = 1;
C
Cary Xu 已提交
1140
      }
C
Cary Xu 已提交
1141
      ++nColsOfBlockSma;
C
Cary Xu 已提交
1142 1143
    } else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) {
      // check if all rows normal
C
Cary Xu 已提交
1144
      pBlockCol->blen = 0;
C
Cary Xu 已提交
1145
    } else {
C
Cary Xu 已提交
1146
      pBlockCol->blen = 1;
H
Hongze Cheng 已提交
1147
    }
C
Cary Xu 已提交
1148 1149

    ++nColsNotAllNull;
H
Hongze Cheng 已提交
1150 1151 1152 1153 1154 1155 1156
  }

  ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);

  // Compress the data if neccessary
  int      tcol = 0;  // counter of not all NULL and written columns
  uint32_t toffset = 0;
1157
  int32_t  tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
H
Hongze Cheng 已提交
1158
  int32_t  lsize = tsize;
C
Cary Xu 已提交
1159
  uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsOfBlockSma, SBlockVerLatest);
H
Hongze Cheng 已提交
1160
  int32_t  keyLen = 0;
C
Cary Xu 已提交
1161
  int32_t  nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
C
Cary Xu 已提交
1162
  int32_t  sBitmaps = isSuper ? (int32_t)TD_BITMAP_BYTES_I(rowsToWrite) : nBitmaps;
C
Cary Xu 已提交
1163

1164
  for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) {
H
Hongze Cheng 已提交
1165 1166 1167
    // All not NULL columns finish
    if (ncol != 0 && tcol >= nColsNotAllNull) break;

L
Liu Jicong 已提交
1168
    SDataCol  *pDataCol = pDataCols->cols + ncol;
H
Hongze Cheng 已提交
1169 1170 1171 1172 1173
    SBlockCol *pBlockCol = pBlockData->cols + tcol;

    if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;

    int32_t flen;  // final length
C
Cary Xu 已提交
1174
    int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite, pDataCols->bitmapMode);
C
Cary Xu 已提交
1175

C
Cary Xu 已提交
1176
#ifdef TD_SUPPORT_BITMAP
C
Cary Xu 已提交
1177
    int32_t tBitmaps = 0;
1178
    int32_t tBitmapsLen = 0;
C
Cary Xu 已提交
1179
    if ((ncol != 0) && (pBlockCol->blen > 0)) {
C
Cary Xu 已提交
1180
      tBitmaps = isSuper ? sBitmaps : nBitmaps;
C
Cary Xu 已提交
1181
    }
C
Cary Xu 已提交
1182
#endif
C
Cary Xu 已提交
1183

C
bug fix  
Cary Xu 已提交
1184
    void *tptr, *bptr;
H
Hongze Cheng 已提交
1185 1186

    // Make room
1187
    if (tsdbMakeRoom(ppBuf, lsize + tlen + tBitmaps + 2 * COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
H
Hongze Cheng 已提交
1188 1189 1190 1191 1192 1193
      return -1;
    }
    pBlockData = (SBlockData *)(*ppBuf);
    pBlockCol = pBlockData->cols + tcol;
    tptr = POINTER_SHIFT(pBlockData, lsize);

H
more  
Hongze Cheng 已提交
1194
    if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
H
Hongze Cheng 已提交
1195 1196 1197 1198 1199
      return -1;
    }

    // Compress or just copy
    if (pCfg->compression) {
1200
#if 0
C
Cary Xu 已提交
1201
      flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr,
H
Hongze Cheng 已提交
1202 1203
                                                      tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
                                                      tlen + COMP_OVERFLOW_BYTES);
1204 1205 1206 1207 1208
#endif
      flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
                                                      tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
                                                      tlen + COMP_OVERFLOW_BYTES);
      if (tBitmaps > 0) {
C
bug fix  
Cary Xu 已提交
1209
        bptr = POINTER_SHIFT(pBlockData, lsize + flen);
C
Cary Xu 已提交
1210
        if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
C
Cary Xu 已提交
1211
          tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
C
Cary Xu 已提交
1212
        }
1213
        tBitmapsLen =
C
bug fix  
Cary Xu 已提交
1214
            tsCompressTinyint((char *)pDataCol->pBitmap, tBitmaps, tBitmaps, bptr, tBitmaps + COMP_OVERFLOW_BYTES,
1215 1216 1217 1218
                              pCfg->compression, *ppCBuf, tBitmaps + COMP_OVERFLOW_BYTES);
        TASSERT((tBitmapsLen > 0) && (tBitmapsLen <= (tBitmaps + COMP_OVERFLOW_BYTES)));
        flen += tBitmapsLen;
      }
H
Hongze Cheng 已提交
1219 1220 1221
    } else {
      flen = tlen;
      memcpy(tptr, pDataCol->pData, flen);
1222
      if (tBitmaps > 0) {
C
bug fix  
Cary Xu 已提交
1223
        bptr = POINTER_SHIFT(pBlockData, lsize + flen);
1224 1225 1226
        if (isSuper && !tdDataColsIsBitmapI(pDataCols)) {
          tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap);
        }
C
bug fix  
Cary Xu 已提交
1227
        memcpy(bptr, pDataCol->pBitmap, tBitmaps);
1228 1229 1230
        tBitmapsLen = tBitmaps;
        flen += tBitmapsLen;
      }
H
Hongze Cheng 已提交
1231 1232 1233 1234
    }

    // Add checksum
    ASSERT(flen > 0);
1235
    ASSERT(tBitmapsLen <= 1024);
H
Hongze Cheng 已提交
1236 1237 1238 1239 1240
    flen += sizeof(TSCKSUM);
    taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
    tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)));

    if (ncol != 0) {
H
Hongze Cheng 已提交
1241
      pBlockCol->offset = toffset;
1242 1243
      pBlockCol->len = flen;  // data + bitmaps
      pBlockCol->blen = tBitmapsLen;
1244
      ++tcol;
H
Hongze Cheng 已提交
1245 1246 1247 1248 1249 1250 1251 1252 1253
    } else {
      keyLen = flen;
    }

    toffset += flen;
    lsize += flen;
  }

  pBlockData->delimiter = TSDB_FILE_DELIMITER;
H
Hongze Cheng 已提交
1254
  pBlockData->uid = pTable->uid;
H
Hongze Cheng 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
  pBlockData->numOfCols = nColsNotAllNull;

  taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
  tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));

  // Write the whole block to file
  if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
    return -1;
  }

C
Cary Xu 已提交
1265
  uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0;
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
  if (aggrStatus > 0) {
    taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
    tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));

    // Write the whole block to file
    if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) {
      return -1;
    }
  }

1276
  // Update pBlock membership variables
H
Hongze Cheng 已提交
1277 1278 1279 1280 1281 1282 1283 1284
  pBlock->last = isLast;
  pBlock->offset = offset;
  pBlock->algorithm = pCfg->compression;
  pBlock->numOfRows = rowsToWrite;
  pBlock->len = lsize;
  pBlock->keyLen = keyLen;
  pBlock->numOfSubBlocks = isSuper ? 1 : 0;
  pBlock->numOfCols = nColsNotAllNull;
C
Cary Xu 已提交
1285
  pBlock->numOfBSma = nColsOfBlockSma;
H
Hongze Cheng 已提交
1286 1287
  pBlock->minKey.ts = dataColsKeyFirst(pDataCols);
  pBlock->maxKey.ts = dataColsKeyLast(pDataCols);
1288 1289 1290
  pBlock->aggrStat = aggrStatus;
  pBlock->blkVer = SBlockVerLatest;
  pBlock->aggrOffset = (uint64_t)offsetAggr;
H
Hongze Cheng 已提交
1291

S
Shengliang Guan 已提交
1292
  tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
H
Hongze Cheng 已提交
1293
            " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
H
Hongze Cheng 已提交
1294
            REPO_ID(pRepo), pTable->uid, TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
H
Hongze Cheng 已提交
1295
            pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts);
H
Hongze Cheng 已提交
1296 1297 1298 1299 1300 1301

  return 0;
}

static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
                          bool isSuper) {
1302 1303 1304 1305
  return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
                            isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
                            pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
                            (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
H
Hongze Cheng 已提交
1306 1307
}

H
more  
Hongze Cheng 已提交
1308
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
L
Liu Jicong 已提交
1309
  SDFile   *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
H
more  
Hongze Cheng 已提交
1310
  SBlockIdx blkIdx;
L
Liu Jicong 已提交
1311
  STable   *pTable = TSDB_COMMIT_TABLE(pCommih);
H
Hongze Cheng 已提交
1312

H
more  
Hongze Cheng 已提交
1313 1314 1315 1316
  if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
                             &blkIdx) < 0) {
    return -1;
  }
H
Hongze Cheng 已提交
1317

H
more  
Hongze Cheng 已提交
1318 1319 1320
  if (blkIdx.numOfBlocks == 0) {
    return 0;
  }
H
Hongze Cheng 已提交
1321

H
more  
Hongze Cheng 已提交
1322 1323 1324 1325
  if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }
H
Hongze Cheng 已提交
1326

H
more  
Hongze Cheng 已提交
1327 1328
  return 0;
}
H
Hongze Cheng 已提交
1329

H
more  
Hongze Cheng 已提交
1330
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
L
Liu Jicong 已提交
1331 1332
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbCfg  *pCfg = REPO_CFG(pRepo);
H
more  
Hongze Cheng 已提交
1333 1334
  SMergeInfo mInfo;
  int32_t    defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
L
Liu Jicong 已提交
1335
  SDFile    *pDFile;
H
more  
Hongze Cheng 已提交
1336 1337
  bool       isLast;
  SBlock     block;
H
more  
Hongze Cheng 已提交
1338

H
more  
Hongze Cheng 已提交
1339
  while (true) {
1340 1341
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, defaultRows,
                          pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo);
H
more  
Hongze Cheng 已提交
1342

H
more  
Hongze Cheng 已提交
1343
    if (pCommith->pDataCols->numOfRows <= 0) break;
H
more  
Hongze Cheng 已提交
1344

H
refact  
Hongze Cheng 已提交
1345
    if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) {
H
more  
Hongze Cheng 已提交
1346 1347 1348 1349 1350 1351
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
      isLast = false;
    } else {
      pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
      isLast = true;
    }
H
Hongze Cheng 已提交
1352

H
more  
Hongze Cheng 已提交
1353
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
H
Hongze Cheng 已提交
1354

H
more  
Hongze Cheng 已提交
1355 1356 1357 1358
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
      return -1;
    }
  }
H
Hongze Cheng 已提交
1359

H
more  
Hongze Cheng 已提交
1360 1361
  return 0;
}
H
Hongze Cheng 已提交
1362

H
more  
Hongze Cheng 已提交
1363
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
L
Liu Jicong 已提交
1364 1365
  STsdb     *pRepo = TSDB_COMMIT_REPO(pCommith);
  STsdbCfg  *pCfg = REPO_CFG(pRepo);
H
more  
Hongze Cheng 已提交
1366
  int        nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
L
Liu Jicong 已提交
1367
  SBlock    *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
H
more  
Hongze Cheng 已提交
1368
  TSKEY      keyLimit;
1369
  int16_t    colId = PRIMARYKEY_TIMESTAMP_COL_ID;
H
more  
Hongze Cheng 已提交
1370 1371 1372
  SMergeInfo mInfo;
  SBlock     subBlocks[TSDB_MAX_SUBBLOCKS];
  SBlock     block, supBlock;
L
Liu Jicong 已提交
1373
  SDFile    *pDFile;
H
more  
Hongze Cheng 已提交
1374 1375 1376 1377

  if (bidx == nBlocks - 1) {
    keyLimit = pCommith->maxKey;
  } else {
H
Hongze Cheng 已提交
1378
    keyLimit = pBlock[1].minKey.ts - 1;
H
more  
Hongze Cheng 已提交
1379
  }
H
Hongze Cheng 已提交
1380

H
Hongze Cheng 已提交
1381
  STbDataIter titer = *(pIter->pIter);
C
Cary Xu 已提交
1382
  if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1;
H
Hongze Cheng 已提交
1383

1384 1385 1386
  tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL,
                        pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
                        &mInfo);
H
Hongze Cheng 已提交
1387

H
more  
Hongze Cheng 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
  if (mInfo.nOperations == 0) {
    // no new data to insert (all updates denied)
    if (tsdbMoveBlock(pCommith, bidx) < 0) {
      return -1;
    }
    *(pIter->pIter) = titer;
  } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
    // Ignore the block
    ASSERT(0);
    *(pIter->pIter) = titer;
  } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
    // Add a sub-block
1400 1401 1402
    tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, INT32_MAX,
                          pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData,
                          pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
H
more  
Hongze Cheng 已提交
1403 1404 1405 1406 1407
    if (pBlock->last) {
      pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
    } else {
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
    }
H
Hongze Cheng 已提交
1408

H
more  
Hongze Cheng 已提交
1409
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
H
Hongze Cheng 已提交
1410

H
more  
Hongze Cheng 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419
    if (pBlock->numOfSubBlocks == 1) {
      subBlocks[0] = *pBlock;
      subBlocks[0].numOfSubBlocks = 0;
    } else {
      memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
             sizeof(SBlock) * pBlock->numOfSubBlocks);
    }
    subBlocks[pBlock->numOfSubBlocks] = block;
    supBlock = *pBlock;
H
Hongze Cheng 已提交
1420 1421
    supBlock.minKey.ts = mInfo.keyFirst;
    supBlock.maxKey.ts = mInfo.keyLast;
H
more  
Hongze Cheng 已提交
1422 1423 1424 1425 1426 1427 1428 1429 1430
    supBlock.numOfSubBlocks++;
    supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
    supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);

    if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
  } else {
    if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
    if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
  }
H
Hongze Cheng 已提交
1431

H
more  
Hongze Cheng 已提交
1432 1433
  return 0;
}
H
Hongze Cheng 已提交
1434

1435 1436 1437 1438 1439 1440 1441 1442
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
  SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  if (pBlock->last) {
    return pCommith->isLFileSame;
  }
  return pCommith->isDFileSame;
}

H
Hongze Cheng 已提交
1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
  SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
  SDFile *pDFile;
  SBlock  block;
  bool    isSameFile;

  ASSERT(pBlock->numOfSubBlocks > 0);

  if (pBlock->last) {
    pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
    isSameFile = pCommith->isLFileSame;
  } else {
    pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
    isSameFile = pCommith->isDFileSame;
  }

  if (isSameFile) {
    if (pBlock->numOfSubBlocks == 1) {
      if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) {
        return -1;
      }
    } else {
      block = *pBlock;
      block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk);

      if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
                             pBlock->numOfSubBlocks) < 0) {
        return -1;
      }
    }
  } else {
    if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
  }

  return 0;
}

static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
  if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

H
more  
Hongze Cheng 已提交
1496 1497
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
                              bool isLastOneBlock) {
L
Liu Jicong 已提交
1498
  STsdb    *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
1499 1500
  STsdbCfg *pCfg = REPO_CFG(pRepo);
  SBlock    block;
L
Liu Jicong 已提交
1501
  SDFile   *pDFile;
H
more  
Hongze Cheng 已提交
1502 1503
  bool      isLast;
  int32_t   defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
H
more  
Hongze Cheng 已提交
1504

H
more  
Hongze Cheng 已提交
1505 1506
  int biter = 0;
  while (true) {
H
Hongze Cheng 已提交
1507 1508
    tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols,
                              keyLimit, defaultRows, pCfg->update);
H
Hongze Cheng 已提交
1509

H
more  
Hongze Cheng 已提交
1510
    if (pCommith->pDataCols->numOfRows == 0) break;
H
Hongze Cheng 已提交
1511

H
more  
Hongze Cheng 已提交
1512
    if (isLastOneBlock) {
H
refact  
Hongze Cheng 已提交
1513
      if (pCommith->pDataCols->numOfRows < pCfg->minRows) {
H
more  
Hongze Cheng 已提交
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
        pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
        isLast = true;
      } else {
        pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
        isLast = false;
      }
    } else {
      pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
      isLast = false;
    }
H
Hongze Cheng 已提交
1524

H
more  
Hongze Cheng 已提交
1525 1526 1527
    if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
    if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
  }
H
more  
Hongze Cheng 已提交
1528

H
more  
Hongze Cheng 已提交
1529 1530
  return 0;
}
H
more  
Hongze Cheng 已提交
1531

H
Hongze Cheng 已提交
1532 1533
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
                                      SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) {
H
more  
Hongze Cheng 已提交
1534 1535
  TSKEY     key1 = INT64_MAX;
  TSKEY     key2 = INT64_MAX;
1536
  TSKEY     lastKey = TSKEY_INITIAL_VAL;
H
more  
Hongze Cheng 已提交
1537
  STSchema *pSchema = NULL;
H
Hongze Cheng 已提交
1538

H
more  
Hongze Cheng 已提交
1539 1540
  ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
  tdResetDataCols(pTarget);
H
more  
Hongze Cheng 已提交
1541

C
Cary Xu 已提交
1542
  pTarget->bitmapMode = pDataCols->bitmapMode;
1543 1544
  // TODO: filter Multi-Version
  // TODO: support delete function
H
more  
Hongze Cheng 已提交
1545 1546
  while (true) {
    key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
C
Cary Xu 已提交
1547 1548
    STSRow *row = tsdbNextIterRow(pCommitIter->pIter);
    if (row == NULL || TD_ROW_KEY(row) > maxKey) {
H
more  
Hongze Cheng 已提交
1549 1550
      key2 = INT64_MAX;
    } else {
C
Cary Xu 已提交
1551
      key2 = TD_ROW_KEY(row);
H
more  
Hongze Cheng 已提交
1552
    }
H
more  
Hongze Cheng 已提交
1553

H
more  
Hongze Cheng 已提交
1554
    if (key1 == INT64_MAX && key2 == INT64_MAX) break;
H
more  
Hongze Cheng 已提交
1555

H
more  
Hongze Cheng 已提交
1556
    if (key1 < key2) {
1557 1558 1559
      if (lastKey != TSKEY_INITIAL_VAL) {
        ++pTarget->numOfRows;
      }
C
Cary Xu 已提交
1560
      for (int i = 0; i < pDataCols->numOfCols; ++i) {
H
more  
Hongze Cheng 已提交
1561
        // TODO: dataColAppendVal may fail
C
Cary Xu 已提交
1562
        SCellVal sVal = {0};
C
Cary Xu 已提交
1563
        if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
C
Cary Xu 已提交
1564 1565
          TASSERT(0);
        }
H
refact  
Hongze Cheng 已提交
1566
        tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
C
Cary Xu 已提交
1567
                             pTarget->bitmapMode, false);
H
more  
Hongze Cheng 已提交
1568
      }
H
more  
Hongze Cheng 已提交
1569

1570
      lastKey = key1;
C
Cary Xu 已提交
1571
      ++(*iter);
H
more  
Hongze Cheng 已提交
1572
    } else if (key1 > key2) {
C
Cary Xu 已提交
1573
      if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
1574
        pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
H
more  
Hongze Cheng 已提交
1575 1576
        ASSERT(pSchema != NULL);
      }
H
Hongze Cheng 已提交
1577

1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
      if (key2 == lastKey) {
        if (TD_SUPPORT_UPDATE(update)) {
          tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
        }
      } else {
        if (lastKey != TSKEY_INITIAL_VAL) {
          ++pTarget->numOfRows;
        }
        tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
        lastKey = key2;
      }
H
more  
Hongze Cheng 已提交
1589

H
Hongze Cheng 已提交
1590
      tsdbTbDataIterNext(pCommitIter->pIter);
H
more  
Hongze Cheng 已提交
1591
    } else {
H
Hongze Cheng 已提交
1592
      if (lastKey != key1) {
1593 1594 1595
        if (lastKey != TSKEY_INITIAL_VAL) {
          ++pTarget->numOfRows;
        }
1596 1597 1598
        lastKey = key1;
      }

C
Cary Xu 已提交
1599 1600 1601
      // copy disk data
      for (int i = 0; i < pDataCols->numOfCols; ++i) {
        SCellVal sVal = {0};
1602
        // no duplicated TS keys in pDataCols from file
C
Cary Xu 已提交
1603 1604 1605 1606 1607
        if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
          TASSERT(0);
        }
        // TODO: tdAppendValToDataCol may fail
        tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
C
Cary Xu 已提交
1608
                             pTarget->bitmapMode, false);
C
Cary Xu 已提交
1609 1610 1611 1612 1613
      }

      if (TD_SUPPORT_UPDATE(update)) {
        // copy mem data(Multi-Version)
        if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
1614
          pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row));
C
Cary Xu 已提交
1615 1616 1617 1618
          ASSERT(pSchema != NULL);
        }

        // TODO: merge with Multi-Version
1619
        tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
C
Cary Xu 已提交
1620
      }
1621
      ++(*iter);
H
Hongze Cheng 已提交
1622
      tsdbTbDataIterNext(pCommitIter->pIter);
H
more  
Hongze Cheng 已提交
1623 1624
    }

1625 1626 1627 1628 1629
    if (pTarget->numOfRows >= (maxRows - 1)) break;
  }

  if (lastKey != TSKEY_INITIAL_VAL) {
    ++pTarget->numOfRows;
H
more  
Hongze Cheng 已提交
1630 1631
  }
}
H
Hongze Cheng 已提交
1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649

static void tsdbResetCommitTable(SCommitH *pCommith) {
  taosArrayClear(pCommith->aSubBlk);
  taosArrayClear(pCommith->aSupBlk);
  pCommith->pTable = NULL;
}

static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
  if (pCommith->isRFileSet) {
    tsdbCloseAndUnsetFSet(&(pCommith->readh));
  }

  if (!hasError) {
    TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith));
  }
  tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}

H
more  
Hongze Cheng 已提交
1650
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
L
Liu Jicong 已提交
1651
  STsdb    *pRepo = TSDB_COMMIT_REPO(pCommith);
H
more  
Hongze Cheng 已提交
1652 1653
  STsdbCfg *pCfg = REPO_CFG(pRepo);
  int       mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
H
Hongze Cheng 已提交
1654

H
more  
Hongze Cheng 已提交
1655
  ASSERT(mergeRows > 0);
H
Hongze Cheng 已提交
1656

H
refact  
Hongze Cheng 已提交
1657
  if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) {
H
more  
Hongze Cheng 已提交
1658
    if (pBlock->last) {
H
refact  
Hongze Cheng 已提交
1659
      if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true;
H
more  
Hongze Cheng 已提交
1660
    } else {
H
refact  
Hongze Cheng 已提交
1661
      if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true;
H
more  
Hongze Cheng 已提交
1662 1663
    }
  }
H
Hongze Cheng 已提交
1664

H
more  
Hongze Cheng 已提交
1665
  return false;
H
Hongze Cheng 已提交
1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
}

static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row,
                                    bool merge) {
  if (pCols) {
    if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) {
      *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row));
      if (*ppSchema == NULL) {
        ASSERT(false);
        return -1;
      }
    }

    tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge);
  }

  return 0;
}

static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
                                 SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
                                 SMergeInfo *pMergeInfo) {
  ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
  if (pIter == NULL) return 0;
  STSchema *pSchema = NULL;
  TSKEY     rowKey = 0;
  TSKEY     fKey = 0;
  // only fetch lastKey from mem data as file data not used in this function actually
  TSKEY      lastKey = TSKEY_INITIAL_VAL;
  bool       isRowDel = false;
  int        filterIter = 0;
  STSRow    *row = NULL;
  SMergeInfo mInfo;

  // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
  // query handle)

  if (pMergeInfo == NULL) pMergeInfo = &mInfo;

  memset(pMergeInfo, 0, sizeof(*pMergeInfo));
  pMergeInfo->keyFirst = INT64_MAX;
  pMergeInfo->keyLast = INT64_MIN;
  if (pCols) tdResetDataCols(pCols);

  row = tsdbNextIterRow(pIter);
  if (row == NULL || TD_ROW_KEY(row) > maxKey) {
    rowKey = INT64_MAX;
    isRowDel = false;
  } else {
    rowKey = TD_ROW_KEY(row);
    isRowDel = TD_ROW_IS_DELETED(row);
  }

  if (filterIter >= nFilterKeys) {
    fKey = INT64_MAX;
  } else {
    fKey = tdGetKey(filterKeys[filterIter]);
  }
  // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
  // 2. rowKey - would dup since Multi-Version supported
  while (true) {
    if (fKey == INT64_MAX && rowKey == INT64_MAX) break;

    if (fKey < rowKey) {
      pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
      pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
#if 1
    } else if (fKey > rowKey) {
      if (isRowDel) {
        // TODO: support delete function
        pMergeInfo->rowsDeleteFailed++;
      } else {
        if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break;
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;

        if (lastKey != rowKey) {
          pMergeInfo->rowsInserted++;
          pMergeInfo->nOperations++;
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
          if (pCols) {
            if (lastKey != TSKEY_INITIAL_VAL) {
              ++pCols->numOfRows;
            }
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
          }
          lastKey = rowKey;
        } else {
          if (keepDup) {
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
          } else {
            // discard
          }
        }
      }

      tsdbTbDataIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }
    } else {           // fkey == rowKey
      if (isRowDel) {  // TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
        ASSERT(!keepDup);
        if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
        pMergeInfo->rowsDeleteSucceed++;
        pMergeInfo->nOperations++;
        tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
      } else {
        if (keepDup) {
          if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break;
          if (lastKey != rowKey) {
            pMergeInfo->rowsUpdated++;
            pMergeInfo->nOperations++;
            pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
            pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
            if (pCols) {
              if (lastKey != TSKEY_INITIAL_VAL) {
                ++pCols->numOfRows;
              }
              tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false);
            }
            lastKey = rowKey;
          } else {
            tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true);
          }
        } else {
          pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey);
          pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey);
        }
      }

      tsdbTbDataIterNext(pIter);
      row = tsdbNextIterRow(pIter);
      if (row == NULL || TD_ROW_KEY(row) > maxKey) {
        rowKey = INT64_MAX;
        isRowDel = false;
      } else {
        rowKey = TD_ROW_KEY(row);
        isRowDel = TD_ROW_IS_DELETED(row);
      }

      filterIter++;
      if (filterIter >= nFilterKeys) {
        fKey = INT64_MAX;
      } else {
        fKey = tdGetKey(filterKeys[filterIter]);
      }
    }
#endif
  }
  if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
    ++pCols->numOfRows;
  }

  return 0;
H
Hongze Cheng 已提交
1833
}