tsdbCompact.c 17.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Hongze Cheng 已提交
14
 */
H
Hongze Cheng 已提交
15
#if 0
16
#include "tsdbint.h"
H
Hongze Cheng 已提交
17

18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
typedef struct {
  STable *    pTable;
  SBlockIdx * pBlkIdx;
  SBlockIdx   bindex;
  SBlockInfo *pInfo;
} STableCompactH;

typedef struct {
  SRtn       rtn;
  SFSIter    fsIter;
  SArray *   tbArray;  // table array to cache table obj and block indexes
  SReadH     readh;
  SDFileSet  wSet;
  SArray *   aBlkIdx;
  SArray *   aSupBlk;
  SDataCols *pDataCols;
} SCompactH;

#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))

static int  tsdbAsyncCompact(STsdbRepo *pRepo);
static void tsdbStartCompact(STsdbRepo *pRepo);
static void tsdbEndCompact(STsdbRepo *pRepo, int eno);
static int  tsdbCompactMeta(STsdbRepo *pRepo);
static int  tsdbCompactTSData(STsdbRepo *pRepo);
static int  tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet);
static bool tsdbShouldCompact(SCompactH *pComph);
static int  tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo);
static void tsdbDestroyCompactH(SCompactH *pComph);
static int  tsdbInitCompTbArray(SCompactH *pComph);
static void tsdbDestroyCompTbArray(SCompactH *pComph);
static int  tsdbCacheFSetIndex(SCompactH *pComph);
static int  tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet);
static void tsdbCompactFSetEnd(SCompactH *pComph);
static int  tsdbCompactFSetImpl(SCompactH *pComph);
static int  tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
                                      void **ppCBuf);

62
enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT};
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }

void *tsdbCompactImpl(STsdbRepo *pRepo) {
  // Check if there are files in TSDB FS to compact
  if (REPO_FS(pRepo)->cstatus->pmf == NULL) {
    tsdbInfo("vgId:%d no file to compact in FS", REPO_ID(pRepo));
    return NULL;
  }

  tsdbStartCompact(pRepo);

  if (tsdbCompactMeta(pRepo) < 0) {
    tsdbError("vgId:%d failed to compact META data since %s", REPO_ID(pRepo), tstrerror(terrno));
    goto _err;
  }

  if (tsdbCompactTSData(pRepo) < 0) {
    tsdbError("vgId:%d failed to compact TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
    goto _err;
  }

  tsdbEndCompact(pRepo, TSDB_CODE_SUCCESS);
  return NULL;

_err:
  pRepo->code = terrno;
  tsdbEndCompact(pRepo, terrno);
  return NULL;
}

static int tsdbAsyncCompact(STsdbRepo *pRepo) {
94
  if (pRepo->compactState != TSDB_NO_COMPACT) {
95
    tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo));
96 97
    return 0; 
  } 
98
  pRepo->compactState = TSDB_WAITING_COMPACT;   
99 100 101 102 103
  tsem_wait(&(pRepo->readyToCommit));
  return tsdbScheduleCommit(pRepo, COMPACT_REQ);
}

static void tsdbStartCompact(STsdbRepo *pRepo) {
104
  assert(pRepo->compactState != TSDB_IN_COMPACT);
105 106 107
  tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo));
  tsdbStartFSTxn(pRepo, 0, 0);
  pRepo->code = TSDB_CODE_SUCCESS;
108
  pRepo->compactState = TSDB_IN_COMPACT;
109 110 111 112 113 114 115 116
}

static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
  if (eno != TSDB_CODE_SUCCESS) {
    tsdbEndFSTxnWithError(REPO_FS(pRepo));
  } else {
    tsdbEndFSTxn(pRepo);
  }
117
  pRepo->compactState = TSDB_NO_COMPACT;
118 119 120 121 122 123 124 125 126 127
  tsdbInfo("vgId:%d compact over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
  tsem_post(&(pRepo->readyToCommit));
}

static int tsdbCompactMeta(STsdbRepo *pRepo) {
  STsdbFS *pfs = REPO_FS(pRepo);
  tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
  return 0;
}

Y
yihaoDeng 已提交
128 129 130
  static int tsdbCompactTSData(STsdbRepo *pRepo) {
    SCompactH  compactH;
    SDFileSet *pSet = NULL;
131

Y
yihaoDeng 已提交
132
    tsdbDebug("vgId:%d start to compact TS data", REPO_ID(pRepo));
133

Y
yihaoDeng 已提交
134 135 136 137
    // If no file, just return 0;
    if (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0) {
      tsdbDebug("vgId:%d no TS data file to compact, compact over", REPO_ID(pRepo));
      return 0;
138 139
    }

Y
yihaoDeng 已提交
140
    if (tsdbInitCompactH(&compactH, pRepo) < 0) {
141 142 143
      return -1;
    }

Y
yihaoDeng 已提交
144 145 146 147 148 149 150
    while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
      // Remove those expired files
      if (pSet->fid < compactH.rtn.minFid) {
        tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
                TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
        continue;
      }
151

Y
yihaoDeng 已提交
152 153 154 155 156
      if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
        tsdbDebug("vgId:%d FSET %d on level %d, should not compact", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
        tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
        continue;
      }
157

Y
yihaoDeng 已提交
158 159 160 161 162 163
      if (tsdbCompactFSet(&compactH, pSet) < 0) {
        tsdbDestroyCompactH(&compactH);
        tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
        return -1;
      }
    }
164

Y
yihaoDeng 已提交
165 166 167
    tsdbDestroyCompactH(&compactH);
    tsdbDebug("vgId:%d compact TS data over", REPO_ID(pRepo));
    return 0;
168 169
  }

Y
yihaoDeng 已提交
170 171 172 173 174 175
  static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) {
    STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
    SDiskID    did;

    tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
              TSDB_FSET_ID(pSet));
176

Y
yihaoDeng 已提交
177
    if (tsdbCompactFSetInit(pComph, pSet) < 0) {
178 179 180
      return -1;
    }

Y
yihaoDeng 已提交
181 182 183 184 185 186 187 188
    if (!tsdbShouldCompact(pComph)) {
      tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
      if (tsdbApplyRtnOnFSet(TSDB_COMPACT_REPO(pComph), pSet, &(pComph->rtn)) < 0) {
        tsdbCompactFSetEnd(pComph);
        return -1;
      }
    } else {
      // Create new fset as compacted fset
S
Shengliang Guan 已提交
189
      if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &did) < 0) {
Y
yihaoDeng 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
        terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
        tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
        tsdbCompactFSetEnd(pComph);
        return -1;
      }

      tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
                      FS_TXN_VERSION(REPO_FS(pRepo)));
      if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
        tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
        tsdbCompactFSetEnd(pComph);
        return -1;
      }

      if (tsdbCompactFSetImpl(pComph) < 0) {
        tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
        tsdbRemoveDFileSet(TSDB_COMPACT_WSET(pComph));
        tsdbCompactFSetEnd(pComph);
        return -1;
      }

211
      tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
Y
yihaoDeng 已提交
212 213
      tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_COMPACT_WSET(pComph));
      tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
214 215
    }

Y
yihaoDeng 已提交
216 217
    tsdbCompactFSetEnd(pComph);
    return 0;
218 219
  }

Y
yihaoDeng 已提交
220 221 222 223 224 225 226 227 228
  static bool tsdbShouldCompact(SCompactH *pComph) {
    STsdbRepo *     pRepo = TSDB_COMPACT_REPO(pComph);
    STsdbCfg *      pCfg = REPO_CFG(pRepo);
    SReadH *        pReadh = &(pComph->readh);
    STableCompactH *pTh;
    SBlock *        pBlock;
    int             defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
    SDFile *        pDataF = TSDB_READ_DATA_FILE(pReadh);
    SDFile *        pLastF = TSDB_READ_LAST_FILE(pReadh);
229

Y
yihaoDeng 已提交
230 231 232 233
    int     tblocks = 0;       // total blocks
    int     nSubBlocks = 0;    // # of blocks with sub-blocks
    int     nSmallBlocks = 0;  // # of blocks with rows < defaultRows
    int64_t tsize = 0;
234

Y
yihaoDeng 已提交
235 236
    for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
      pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
237

Y
yihaoDeng 已提交
238
      if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
239

Y
yihaoDeng 已提交
240 241 242
      for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
        tblocks++;
        pBlock = pTh->pInfo->blocks + bidx;
243

Y
yihaoDeng 已提交
244 245 246
        if (pBlock->numOfRows < defaultRows) {
          nSmallBlocks++;
        }
247

Y
yihaoDeng 已提交
248 249 250 251 252 253 254 255 256 257
        if (pBlock->numOfSubBlocks > 1) {
          nSubBlocks++;
          for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
            SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
            tsize = tsize + iBlock->len;
          }
        } else if (pBlock->numOfSubBlocks == 1) {
          tsize += pBlock->len;
        } else {
          ASSERT(0);
258 259 260
        }
      }
    }
Y
yihaoDeng 已提交
261 262 263

    return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
            (tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
264 265
  }

Y
yihaoDeng 已提交
266 267
  static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) {
    STsdbCfg *pCfg = REPO_CFG(pRepo);
268

Y
yihaoDeng 已提交
269
    memset(pComph, 0, sizeof(*pComph));
270

Y
yihaoDeng 已提交
271
    TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph));
272

Y
yihaoDeng 已提交
273 274
    tsdbGetRtnSnap(pRepo, &(pComph->rtn));
    tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
275

Y
yihaoDeng 已提交
276 277 278
    if (tsdbInitReadH(&(pComph->readh), pRepo) < 0) {
      return -1;
    }
279

Y
yihaoDeng 已提交
280 281 282 283
    if (tsdbInitCompTbArray(pComph) < 0) {
      tsdbDestroyCompactH(pComph);
      return -1;
    }
284

Y
yihaoDeng 已提交
285 286 287 288 289 290
    pComph->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
    if (pComph->aBlkIdx == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbDestroyCompactH(pComph);
      return -1;
    }
291

Y
yihaoDeng 已提交
292 293 294 295 296 297
    pComph->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
    if (pComph->aSupBlk == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbDestroyCompactH(pComph);
      return -1;
    }
298

L
Liu Jicong 已提交
299
    pComph->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
Y
yihaoDeng 已提交
300 301 302 303 304
    if (pComph->pDataCols == NULL) {
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbDestroyCompactH(pComph);
      return -1;
    }
305

Y
yihaoDeng 已提交
306
    return 0;
307 308
  }

Y
yihaoDeng 已提交
309 310 311 312 313 314 315
  static void tsdbDestroyCompactH(SCompactH *pComph) {
    pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
    pComph->aSupBlk = taosArrayDestroy(pComph->aSupBlk);
    pComph->aBlkIdx = taosArrayDestroy(pComph->aBlkIdx);
    tsdbDestroyCompTbArray(pComph);
    tsdbDestroyReadH(&(pComph->readh));
    tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
316 317
  }

Y
yihaoDeng 已提交
318 319 320 321 322
  static int tsdbInitCompTbArray(SCompactH *pComph) {  // Init pComp->tbArray
    STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
    STsdbMeta *pMeta = pRepo->tsdbMeta;

    if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
323

Y
yihaoDeng 已提交
324 325
    pComph->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableCompactH));
    if (pComph->tbArray == NULL) {
326 327 328 329 330
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      tsdbUnlockRepoMeta(pRepo);
      return -1;
    }

Y
yihaoDeng 已提交
331 332 333 334 335 336 337
    // Note here must start from 0
    for (int i = 0; i < pMeta->maxTables; i++) {
      STableCompactH ch = {0};
      if (pMeta->tables[i] != NULL) {
        tsdbRefTable(pMeta->tables[i]);
        ch.pTable = pMeta->tables[i];
      }
338

Y
yihaoDeng 已提交
339 340 341 342 343
      if (taosArrayPush(pComph->tbArray, &ch) == NULL) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        tsdbUnlockRepoMeta(pRepo);
        return -1;
      }
344 345
    }

Y
yihaoDeng 已提交
346 347
    if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
    return 0;
348 349
  }

Y
yihaoDeng 已提交
350 351 352 353
  static void tsdbDestroyCompTbArray(SCompactH *pComph) {
    STableCompactH *pTh;

    if (pComph->tbArray == NULL) return;
354

Y
yihaoDeng 已提交
355 356 357 358 359 360 361 362
    for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
      pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
      if (pTh->pTable) {
        tsdbUnRefTable(pTh->pTable);
      }

      pTh->pInfo = taosTZfree(pTh->pInfo);
    }
363

Y
yihaoDeng 已提交
364
    pComph->tbArray = taosArrayDestroy(pComph->tbArray);
365 366
  }

Y
yihaoDeng 已提交
367 368
  static int tsdbCacheFSetIndex(SCompactH *pComph) {
    SReadH *pReadH = &(pComph->readh);
369

Y
yihaoDeng 已提交
370
    if (tsdbLoadBlockIdx(pReadH) < 0) {
371 372 373
      return -1;
    }

Y
yihaoDeng 已提交
374 375 376
    for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
      STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
      pTh->pBlkIdx = NULL;
377

Y
yihaoDeng 已提交
378 379 380 381
      if (pTh->pTable == NULL) continue;
      if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
        return -1;
      }
382

Y
yihaoDeng 已提交
383 384 385
      if (pReadH->pBlkIdx == NULL) continue;
      pTh->bindex = *(pReadH->pBlkIdx);
      pTh->pBlkIdx = &(pTh->bindex);
386

Y
yihaoDeng 已提交
387 388 389
      if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
        return -1;
      }
390

Y
yihaoDeng 已提交
391 392 393 394
      if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) {
        return -1;
      }
    }
395

Y
yihaoDeng 已提交
396
    return 0;
397 398
  }

Y
yihaoDeng 已提交
399 400 401
  static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet) {
    taosArrayClear(pComph->aBlkIdx);
    taosArrayClear(pComph->aSupBlk);
402

Y
yihaoDeng 已提交
403 404 405
    if (tsdbSetAndOpenReadFSet(&(pComph->readh), pSet) < 0) {
      return -1;
    }
406

Y
yihaoDeng 已提交
407 408 409 410
    if (tsdbCacheFSetIndex(pComph) < 0) {
      tsdbCloseAndUnsetFSet(&(pComph->readh));
      return -1;
    }
411

Y
yihaoDeng 已提交
412 413
    return 0;
  }
414

Y
yihaoDeng 已提交
415
  static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); }
416

Y
yihaoDeng 已提交
417 418 419 420 421 422 423 424
  static int tsdbCompactFSetImpl(SCompactH *pComph) {
    STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
    STsdbCfg * pCfg = REPO_CFG(pRepo);
    SReadH *   pReadh = &(pComph->readh);
    SBlockIdx  blkIdx;
    void **    ppBuf = &(TSDB_COMPACT_BUF(pComph));
    void **    ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph));
    int        defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
425

Y
yihaoDeng 已提交
426
    taosArrayClear(pComph->aBlkIdx);
427

Y
yihaoDeng 已提交
428 429 430
    for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
      STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
      STSchema *      pSchema;
431

Y
yihaoDeng 已提交
432
      if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
433

Y
yihaoDeng 已提交
434 435 436 437 438
      pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
      taosArrayClear(pComph->aSupBlk);
      if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
          (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
439 440
        return -1;
      }
Y
yihaoDeng 已提交
441 442 443 444 445
      tdFreeSchema(pSchema);

      // Loop to compact each block data
      for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
        SBlock *pBlock = pTh->pInfo->blocks + i;
446

Y
yihaoDeng 已提交
447 448
        // Load the block data
        if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
449 450 451
          return -1;
        }

Y
yihaoDeng 已提交
452 453 454 455 456 457 458
        // Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
        if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
          if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) {
            return -1;
          }
        } else {
          int ridx = 0;
459

Y
yihaoDeng 已提交
460 461 462
          while (true) {
            if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
            int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows);
463

464
            tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx, pCfg->update != TD_ROW_PARTIAL_UPDATE);
465

Y
yihaoDeng 已提交
466 467 468 469 470 471 472 473
            if (pComph->pDataCols->numOfRows < defaultRows) {
              break;
            }

            if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
              return -1;
            }
            tdResetDataCols(pComph->pDataCols);
474 475 476 477
          }
        }
      }

Y
yihaoDeng 已提交
478 479 480 481
      if (pComph->pDataCols->numOfRows > 0 &&
          tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
        return -1;
      }
482

Y
yihaoDeng 已提交
483 484 485 486 487 488 489 490 491
      if (tsdbWriteBlockInfoImpl(TSDB_COMPACT_HEAD_FILE(pComph), pTh->pTable, pComph->aSupBlk, NULL, ppBuf, &blkIdx) <
          0) {
        return -1;
      }

      if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pComph->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
        terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
        return -1;
      }
492 493
    }

Y
yihaoDeng 已提交
494
    if (tsdbWriteBlockIdx(TSDB_COMPACT_HEAD_FILE(pComph), pComph->aBlkIdx, ppBuf) < 0) {
495 496 497
      return -1;
    }

Y
yihaoDeng 已提交
498
    return 0;
499 500
  }

Y
yihaoDeng 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
  static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
                                      void **ppCBuf) {
    STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
    STsdbCfg * pCfg = REPO_CFG(pRepo);
    SDFile *   pDFile;
    bool       isLast;
    SBlock     block;

    ASSERT(pDataCols->numOfRows > 0);

    if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
      pDFile = TSDB_COMPACT_LAST_FILE(pComph);
      isLast = true;
    } else {
      pDFile = TSDB_COMPACT_DATA_FILE(pComph);
      isLast = false;
    }
518

Y
yihaoDeng 已提交
519
    if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) {
520 521 522
      return -1;
    }

Y
yihaoDeng 已提交
523
    if (taosArrayPush(pComph->aSupBlk, (void *)(&block)) == NULL) {
524 525 526 527
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }

Y
yihaoDeng 已提交
528
    return 0;
529
}
530

H
Hongze Cheng 已提交
531 532
#endif