tsdbCompact.c 21.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19 20
extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
H
Hongze Cheng 已提交
21 22

typedef struct {
H
Hongze Cheng 已提交
23 24 25 26 27
  STsdb  *pTsdb;
  int64_t commitID;
  int8_t  cmprAlg;
  int32_t maxRows;
  int32_t minRows;
H
Hongze Cheng 已提交
28

H
Hongze Cheng 已提交
29
  STsdbFS fs;
H
Hongze Cheng 已提交
30

H
Hongze Cheng 已提交
31 32 33
  int32_t  fid;
  TABLEID  tbid;
  SSkmInfo tbSkm;
H
Hongze Cheng 已提交
34

H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43
  // Tombstone
  SDelFReader *pDelFReader;
  SArray      *aDelIdx;   // SArray<SDelIdx>
  SArray      *aDelData;  // SArray<SDelData>
  SArray      *aSkyLine;  // SArray<TSDBKEY>
  int32_t      iDelIdx;
  int32_t      iSkyLine;
  TSDBKEY     *pDKey;
  TSDBKEY      dKey;
H
Hongze Cheng 已提交
44

H
Hongze Cheng 已提交
45 46 47 48 49
  // Reader
  SDataFReader   *pReader;
  STsdbDataIter2 *iterList;  // list of iterators
  STsdbDataIter2 *pIter;
  SRBTree         rbt;
H
Hongze Cheng 已提交
50

H
Hongze Cheng 已提交
51 52 53 54 55 56 57 58
  // Writer
  SDataFWriter *pWriter;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SArray       *aSttBlk;    // SArray<SSttBlk>
  SBlockData    bData;
  SBlockData    sData;
} STsdbCompactor;
H
Hongze Cheng 已提交
59

H
Hongze Cheng 已提交
60
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
61 62 63
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
64
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
65 66
  code = tsdbFSRollback(pTsdb);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
67 68 69 70

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
71 72
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
H
Hongze Cheng 已提交
73 74 75 76
  }
  return code;
}

H
Hongze Cheng 已提交
77
static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) {
H
Hongze Cheng 已提交
78 79 80
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
81 82
  pCompactor->tbid = *pId;

H
Hongze Cheng 已提交
83 84 85
  // tombstone
  for (;;) {
    if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) {
H
Hongze Cheng 已提交
86
      pCompactor->pDKey = NULL;
H
Hongze Cheng 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
      break;
    }

    SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx);
    int32_t  c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid);
    if (c < 0) {
      pCompactor->iDelIdx++;
    } else if (c == 0) {
      pCompactor->iDelIdx++;

      code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData);
      TSDB_CHECK_CODE(code, lino, _exit);

      code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1,
                                    pCompactor->aSkyLine);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
103

H
Hongze Cheng 已提交
104
      pCompactor->iSkyLine = 0;
H
Hongze Cheng 已提交
105 106 107 108 109 110 111 112 113
      if (pCompactor->iSkyLine < taosArrayGetSize(pCompactor->aSkyLine)) {
        TSDBKEY *pKey = (TSDBKEY *)taosArrayGet(pCompactor->aSkyLine, pCompactor->iSkyLine);

        pCompactor->dKey.version = 0;
        pCompactor->dKey.ts = pKey->ts;
        pCompactor->pDKey = &pCompactor->dKey;
      } else {
        pCompactor->pDKey = NULL;
      }
H
Hongze Cheng 已提交
114 115
      break;
    } else {
H
Hongze Cheng 已提交
116
      pCompactor->pDKey = NULL;
H
Hongze Cheng 已提交
117 118 119 120
      break;
    }
  }

H
Hongze Cheng 已提交
121
  // writer
H
Hongze Cheng 已提交
122
  code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm);
H
Hongze Cheng 已提交
123
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125
  tMapDataReset(&pCompactor->mDataBlk);
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127
  code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
128 129
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
130
  if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) {
H
Hongze Cheng 已提交
131 132 133 134 135
    if (pCompactor->sData.nRow > 0) {
      code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
136 137
    TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid};
    code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0);
H
Hongze Cheng 已提交
138
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
139
  }
H
Hongze Cheng 已提交
140

H
Hongze Cheng 已提交
141 142 143 144 145 146 147 148 149 150
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pId->suid,
              pId->uid);
  }
  return code;
}
H
Hongze Cheng 已提交
151

H
Hongze Cheng 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pCompactor->bData.nRow > 0) {
    if (pCompactor->bData.nRow < pCompactor->minRows) {
      for (int32_t iRow = 0; iRow < pCompactor->bData.nRow; iRow++) {
        code = tBlockDataAppendRow(&pCompactor->sData, &tsdbRowFromBlockData(&pCompactor->bData, iRow), NULL,
                                   pCompactor->tbid.uid);
        TSDB_CHECK_CODE(code, lino, _exit);

        if (pCompactor->sData.nRow >= pCompactor->maxRows) {
          code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
          TSDB_CHECK_CODE(code, lino, _exit);
        }
      }
      tBlockDataClear(&pCompactor->bData);
    } else {
      code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
H
Hongze Cheng 已提交
171
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
172 173
    }
  }
H
Hongze Cheng 已提交
174

H
Hongze Cheng 已提交
175
  if (pCompactor->mDataBlk.nItem > 0) {
H
Hongze Cheng 已提交
176 177 178 179
    SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
    if (pBlockIdx == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
180 181
    }

H
Hongze Cheng 已提交
182 183
    pBlockIdx->suid = pCompactor->tbid.suid;
    pBlockIdx->uid = pCompactor->tbid.uid;
H
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185 186 187
    code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
188

H
Hongze Cheng 已提交
189 190 191 192 193 194 195 196 197 198
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__,
              pCompactor->tbid.suid, pCompactor->tbid.uid);
  }
  return code;
}
H
Hongze Cheng 已提交
199

H
Hongze Cheng 已提交
200
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
H
Hongze Cheng 已提交
201 202 203
  TSDBKEY  tKey = TSDBROW_KEY(pRow);
  TSDBKEY *aKey = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine);
  int32_t  nKey = TARRAY_SIZE(pCompactor->aSkyLine);
H
Hongze Cheng 已提交
204 205

  if (tKey.ts > pCompactor->pDKey->ts) {
H
Hongze Cheng 已提交
206 207
    do {
      pCompactor->pDKey->version = aKey[pCompactor->iSkyLine].version;
H
Hongze Cheng 已提交
208
      pCompactor->iSkyLine++;
H
Hongze Cheng 已提交
209 210
      if (pCompactor->iSkyLine < nKey) {
        pCompactor->dKey.ts = aKey[pCompactor->iSkyLine].ts;
H
Hongze Cheng 已提交
211
      } else {
H
Hongze Cheng 已提交
212 213 214 215 216 217
        if (pCompactor->pDKey->version == 0) {
          pCompactor->pDKey = NULL;
          return false;
        } else {
          pCompactor->pDKey->ts = INT64_MAX;
        }
H
Hongze Cheng 已提交
218
      }
H
Hongze Cheng 已提交
219
    } while (tKey.ts > pCompactor->pDKey->ts);
H
Hongze Cheng 已提交
220 221 222 223 224 225 226 227 228
  }

  if (tKey.ts < pCompactor->pDKey->ts) {
    if (tKey.version > pCompactor->pDKey->version) {
      return false;
    } else {
      return true;
    }
  } else if (tKey.ts == pCompactor->pDKey->ts) {
H
Hongze Cheng 已提交
229 230
    ASSERT(pCompactor->iSkyLine < nKey);
    if (tKey.version > TMAX(pCompactor->pDKey->version, aKey[pCompactor->iSkyLine].version)) {
H
Hongze Cheng 已提交
231 232 233 234 235
      return false;
    } else {
      return true;
    }
  }
H
Hongze Cheng 已提交
236 237 238 239

  return false;
}

H
Hongze Cheng 已提交
240 241 242
static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
243

H
Hongze Cheng 已提交
244
  // start a new table data write if need
H
Hongze Cheng 已提交
245
  if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) {
H
Hongze Cheng 已提交
246 247
    if (pCompactor->tbid.uid) {
      code = tsdbCompactWriteTableDataEnd(pCompactor);
H
Hongze Cheng 已提交
248
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
249
    }
H
Hongze Cheng 已提交
250

H
Hongze Cheng 已提交
251 252 253 254 255 256 257 258
    if (pRowInfo == NULL) {
      if (pCompactor->sData.nRow > 0) {
        code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg);
        TSDB_CHECK_CODE(code, lino, _exit);
      }
      return code;
    }

H
Hongze Cheng 已提交
259
    code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo);
H
Hongze Cheng 已提交
260
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
261
  }
H
Hongze Cheng 已提交
262

H
Hongze Cheng 已提交
263
  // check if row is deleted
H
Hongze Cheng 已提交
264
  if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
H
Hongze Cheng 已提交
265

H
Hongze Cheng 已提交
266
  if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
H
Hongze Cheng 已提交
267
    code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
H
Hongze Cheng 已提交
268
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
269 270
  }

H
Hongze Cheng 已提交
271 272 273
  code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
274 275 276 277 278 279 280 281 282 283 284
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  } else if (pRowInfo) {
    tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
              TD_VID(pCompactor->pTsdb->pVnode), __func__, pRowInfo->suid, pRowInfo->uid, TSDBROW_TS(&pRowInfo->row),
              TSDBROW_VERSION(&pRowInfo->row));
  }
  return code;
}
H
Hongze Cheng 已提交
285

H
Hongze Cheng 已提交
286 287 288 289
static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) {
  SMetaInfo info;

  if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false;
H
Hongze Cheng 已提交
290
  if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->pIter->rowInfo.uid, &info, NULL)) {
H
Hongze Cheng 已提交
291 292 293 294
    return true;
  }
  return false;
}
H
Hongze Cheng 已提交
295 296 297 298
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
299 300 301 302
  for (;;) {
    if (pCompactor->pIter) {
      code = tsdbDataIterNext2(pCompactor->pIter, NULL);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
303

H
Hongze Cheng 已提交
304 305 306 307 308 309 310 311 312 313 314 315
      if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
        pCompactor->pIter = NULL;
      } else {
        SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt);
        if (pNode) {
          int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode);
          if (c > 0) {
            tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
            pCompactor->pIter = NULL;
          } else if (c == 0) {
            ASSERT(0);
          }
H
Hongze Cheng 已提交
316 317 318
        }
      }
    }
H
Hongze Cheng 已提交
319

H
Hongze Cheng 已提交
320 321 322 323 324
    if (pCompactor->pIter == NULL) {
      SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt);
      if (pNode) {
        pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
      }
H
Hongze Cheng 已提交
325 326 327
    }

    if (pCompactor->pIter) {
H
Hongze Cheng 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
      if (tsdbCompactTableIsDropped(pCompactor)) {
        TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid};
        tRBTreeClear(&pCompactor->rbt);
        for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) {
          code = tsdbDataIterNext2(pCompactor->pIter,
                                   &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid});
          TSDB_CHECK_CODE(code, lino, _exit);

          if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) {
            tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
          }
        }
      } else {
        *ppRowInfo = &pCompactor->pIter->rowInfo;
        break;
      }
H
Hongze Cheng 已提交
344 345
    } else {
      *ppRowInfo = NULL;
H
Hongze Cheng 已提交
346
      break;
H
Hongze Cheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  }
  return code;
}

static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pSet) {
  int32_t code = 0;
  int32_t lino = 0;

  pCompactor->fid = pSet->fid;
  pCompactor->tbid = (TABLEID){0};

H
Hongze Cheng 已提交
365 366 367
  /* tombstone */
  pCompactor->iDelIdx = 0;

H
Hongze Cheng 已提交
368 369
  /* reader */
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet);
H
Hongze Cheng 已提交
370
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
371

H
Hongze Cheng 已提交
372
  code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter);
H
Hongze Cheng 已提交
373
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
374

H
Hongze Cheng 已提交
375
  tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
376 377 378
  if (pCompactor->pIter) {
    pCompactor->pIter->next = pCompactor->iterList;
    pCompactor->iterList = pCompactor->pIter;
H
Hongze Cheng 已提交
379 380 381 382 383 384

    code = tsdbDataIterNext2(pCompactor->pIter, NULL);
    TSDB_CHECK_CODE(code, lino, _exit);

    ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
    tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
H
Hongze Cheng 已提交
385 386 387 388 389 390 391 392 393
  }

  for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
    code = tsdbOpenSttFileDataIter(pCompactor->pReader, iStt, &pCompactor->pIter);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCompactor->pIter) {
      pCompactor->pIter->next = pCompactor->iterList;
      pCompactor->iterList = pCompactor->pIter;
H
Hongze Cheng 已提交
394 395 396 397 398 399

      code = tsdbDataIterNext2(pCompactor->pIter, NULL);
      TSDB_CHECK_CODE(code, lino, _exit);

      ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid);
      tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn);
H
Hongze Cheng 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412
    }
  }
  pCompactor->pIter = NULL;

  /* writer */
  code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb,
                             &(SDFileSet){.fid = pCompactor->fid,
                                          .diskId = pSet->diskId,
                                          .pHeadF = &(SHeadFile){.commitID = pCompactor->commitID},
                                          .pDataF = &(SDataFile){.commitID = pCompactor->commitID},
                                          .pSmaF = &(SSmaFile){.commitID = pCompactor->commitID},
                                          .nSttF = 1,
                                          .aSttF = {&(SSttFile){.commitID = pCompactor->commitID}}});
H
Hongze Cheng 已提交
413
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
414

H
Hongze Cheng 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
  if (pCompactor->aBlockIdx) {
    taosArrayClear(pCompactor->aBlockIdx);
  } else if ((pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  tMapDataReset(&pCompactor->mDataBlk);

  if (pCompactor->aSttBlk) {
    taosArrayClear(pCompactor->aSttBlk);
  } else if ((pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  tBlockDataReset(&pCompactor->bData);
  tBlockDataReset(&pCompactor->sData);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCompactor->fid);
  } else {
    tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
  }
  return code;
}

static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
448 449
  ASSERT(pCompactor->bData.nRow == 0);
  ASSERT(pCompactor->sData.nRow == 0);
H
Hongze Cheng 已提交
450 451

  /* update files */
H
Hongze Cheng 已提交
452 453
  code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
454

H
Hongze Cheng 已提交
455 456 457
  code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
458 459
  code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461 462
  code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
463

H
Hongze Cheng 已提交
464 465
  code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
466

H
Hongze Cheng 已提交
467 468 469 470 471 472 473 474 475 476 477 478
  code = tsdbDataFReaderClose(&pCompactor->pReader);
  TSDB_CHECK_CODE(code, lino, _exit);

  /* do clear */
  while ((pCompactor->pIter = pCompactor->iterList) != NULL) {
    pCompactor->iterList = pCompactor->pIter->next;
    tsdbCloseDataIter2(pCompactor->pIter);
  }

  tBlockDataReset(&pCompactor->bData);
  tBlockDataReset(&pCompactor->sData);

H
Hongze Cheng 已提交
479
_exit:
H
Hongze Cheng 已提交
480 481 482 483 484 485
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCompactor->fid);
  } else {
    tsdbInfo("vgId:%d %s done, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->fid);
  }
H
Hongze Cheng 已提交
486 487
  return code;
}
H
Hongze Cheng 已提交
488 489 490 491 492 493 494 495 496

static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) {
  int32_t code = 0;
  int32_t lino = 0;

  // start compact
  code = tsdbCompactFileSetStart(pCompactor, pSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
497
  // do compact, end with a NULL row
H
Hongze Cheng 已提交
498
  SRowInfo *pRowInfo;
H
Hongze Cheng 已提交
499
  do {
H
Hongze Cheng 已提交
500 501 502 503 504
    code = tsdbCompactNextRow(pCompactor, &pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbCompactWriteTableData(pCompactor, pRowInfo);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
505
  } while (pRowInfo);
H
Hongze Cheng 已提交
506 507 508 509 510 511

  // end compact
  code = tsdbCompactFileSetEnd(pCompactor);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
H
Hongze Cheng 已提交
512 513 514 515 516 517 518 519 520 521
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCompactor->fid);
    if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0);
    while ((pCompactor->pIter = pCompactor->iterList)) {
      pCompactor->iterList = pCompactor->pIter->next;
      tsdbCloseDataIter2(pCompactor->pIter);
    }
    if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader);
  }
H
Hongze Cheng 已提交
522 523 524
  return code;
}

H
Hongze Cheng 已提交
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
  // writer
  tBlockDataDestroy(&pCompactor->sData);
  tBlockDataDestroy(&pCompactor->bData);
  taosArrayDestroy(pCompactor->aSttBlk);
  tMapDataClear(&pCompactor->mDataBlk);
  taosArrayDestroy(pCompactor->aBlockIdx);

  // reader

  // tombstone
  taosArrayDestroy(pCompactor->aSkyLine);
  taosArrayDestroy(pCompactor->aDelData);
  taosArrayDestroy(pCompactor->aDelIdx);

  // others
  tDestroyTSchema(pCompactor->tbSkm.pTSchema);
  tsdbFSDestroy(&pCompactor->fs);

  tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
}

H
Hongze Cheng 已提交
547
static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
548 549 550 551
  int32_t code = 0;
  int32_t lino = 0;

  pCompactor->pTsdb = pTsdb;
H
Hongze Cheng 已提交
552
  pCompactor->commitID = pInfo->commitID;
H
Hongze Cheng 已提交
553 554 555 556 557 558 559 560
  pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
  pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
  pCompactor->fid = INT32_MIN;

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
561
  /* tombstone */
H
Hongze Cheng 已提交
562 563 564 565
  if (pCompactor->fs.pDelFile) {
    code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
566
    if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
H
Hongze Cheng 已提交
567 568 569 570
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
571
    if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
H
Hongze Cheng 已提交
572 573 574 575
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

H
Hongze Cheng 已提交
576
    if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) {
H
Hongze Cheng 已提交
577 578 579 580 581 582 583
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
H
Hongze Cheng 已提交
584

H
Hongze Cheng 已提交
585 586 587 588 589 590 591 592 593 594 595 596 597
  /* reader */

  /* writer */
  code = tBlockDataCreate(&pCompactor->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tBlockDataCreate(&pCompactor->sData);
  TSDB_CHECK_CODE(code, lino, _exit);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino,
              tstrerror(code), pCompactor->commitID);
H
Hongze Cheng 已提交
598 599 600 601 602 603 604 605 606
    tBlockDataDestroy(&pCompactor->sData);
    tBlockDataDestroy(&pCompactor->bData);
    if (pCompactor->fs.pDelFile) {
      taosArrayDestroy(pCompactor->aSkyLine);
      taosArrayDestroy(pCompactor->aDelData);
      taosArrayDestroy(pCompactor->aDelIdx);
      if (pCompactor->pDelFReader) tsdbDelFReaderClose(&pCompactor->pDelFReader);
    }
    tsdbFSDestroy(&pCompactor->fs);
H
Hongze Cheng 已提交
607 608 609 610 611 612
  } else {
    tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pCompactor->commitID);
  }
  return code;
}

H
Hongze Cheng 已提交
613
int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) {
H
Hongze Cheng 已提交
614 615 616
  int32_t code = 0;

  STsdbCompactor *pCompactor = &(STsdbCompactor){0};
H
Hongze Cheng 已提交
617

H
Hongze Cheng 已提交
618
  if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code;
H
Hongze Cheng 已提交
619

H
Hongze Cheng 已提交
620
  for (;;) {
H
Hongze Cheng 已提交
621 622 623 624 625 626
    SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
                                                   tDFileSetCmprFn, TD_GT);
    if (pSet == NULL) {
      pCompactor->fid = INT32_MAX;
      break;
    }
H
Hongze Cheng 已提交
627

H
Hongze Cheng 已提交
628
    if ((code = tsdbCompactFileSet(pCompactor, pSet))) goto _exit;
H
Hongze Cheng 已提交
629 630
  }

H
Hongze Cheng 已提交
631
  if ((code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL))) goto _exit;
H
Hongze Cheng 已提交
632

H
Hongze Cheng 已提交
633
_exit:
H
Hongze Cheng 已提交
634 635 636
  if (code) {
    tsdbAbortCompact(pCompactor);
  } else {
H
Hongze Cheng 已提交
637
    tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
H
Hongze Cheng 已提交
638
  }
H
Hongze Cheng 已提交
639
  tsdbEndCompact(pCompactor);
H
Hongze Cheng 已提交
640 641
  return code;
}
H
Hongze Cheng 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664

int32_t tsdbCommitCompact(STsdb *pTsdb) {
  int32_t code = 0;
  int32_t lino = 0;

  taosThreadRwlockWrlock(&pTsdb->rwLock);

  code = tsdbFSCommit(pTsdb);
  if (code) {
    taosThreadRwlockUnlock(&pTsdb->rwLock);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  taosThreadRwlockUnlock(&pTsdb->rwLock);

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
  return code;
}