tsdbCompact.c 10.9 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
#define TSDB_ITER_TYPE_STT 0x2

H
Hongze Cheng 已提交
22 23
typedef struct {
} SMemDIter;
H
Hongze Cheng 已提交
24 25

typedef struct {
H
Hongze Cheng 已提交
26 27 28 29 30 31 32
  SDataFReader *pReader;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SBlockData    bData;
  int32_t       iBlockIdx;
  int32_t       iDataBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
33 34 35
} SDataDIter;

typedef struct {
H
Hongze Cheng 已提交
36 37 38 39 40 41
  SDataFReader *pReader;
  int32_t       iStt;
  SArray       *aSttBlk;  // SArray<SSttBlk>
  SBlockData    bData;
  int32_t       iSttBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
42 43
} SSttDIter;

H
Hongze Cheng 已提交
44 45 46
typedef struct STsdbDataIter {
  struct STsdbDataIter *next;

H
Hongze Cheng 已提交
47 48 49 50
  int32_t     flag;
  SRowInfo    rowInfo;
  SRBTreeNode n;
  char        handle[];
H
Hongze Cheng 已提交
51 52
} STsdbDataIter;

H
Hongze Cheng 已提交
53
typedef struct {
H
Hongze Cheng 已提交
54 55 56 57 58 59 60 61 62
  STsdb         *pTsdb;
  STsdbFS        fs;
  int64_t        cid;
  int32_t        fid;
  SDFileSet     *pDFileSet;
  SDataFReader  *pReader;
  STsdbDataIter *iterList;  // list of iterators
  SRBTree        rtree;
  SBlockData     bData;
H
Hongze Cheng 已提交
63 64
} STsdbCompactor;

H
Hongze Cheng 已提交
65 66
#define TSDB_FLG_DEEP_COMPACT 0x1

H
Hongze Cheng 已提交
67
// ITER =========================
H
Hongze Cheng 已提交
68 69
static int32_t tsdbDataIterNext(STsdbDataIter *pIter);

H
Hongze Cheng 已提交
70 71 72 73 74 75 76
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
  const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));

  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

H
Hongze Cheng 已提交
77
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
78 79
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
80 81 82 83 84 85 86

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
87
  // TODO
H
Hongze Cheng 已提交
88

H
Hongze Cheng 已提交
89
_exit:
H
Hongze Cheng 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
  if (NULL == pIter) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
107
  pIter->flag = TSDB_ITER_TYPE_DAT;
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121
  SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
  pDataDIter->pReader = pReader;
  pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pDataDIter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;

H
Hongze Cheng 已提交
122
  // TODO
H
Hongze Cheng 已提交
123 124 125
  code = tBlockDataCreate(&pDataDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
126
  pDataDIter->iBlockIdx = -1;
H
Hongze Cheng 已提交
127 128 129
  pDataDIter->iDataBlk = 0;
  pDataDIter->iRow = 0;

H
Hongze Cheng 已提交
130 131
  code = tsdbDataIterNext(pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
132 133 134

_exit:
  if (code) {
H
Hongze Cheng 已提交
135
  _clear_exit:
H
Hongze Cheng 已提交
136
    *ppIter = NULL;
H
Hongze Cheng 已提交
137 138 139 140 141 142
    if (pIter) {
      tBlockDataDestroy(&pDataDIter->bData, 1);
      tMapDataClear(&pDataDIter->mDataBlk);
      taosArrayDestroy(pDataDIter->aBlockIdx);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
143 144 145 146 147 148
  } else {
    *ppIter = pIter;
  }
  return code;
}

H
Hongze Cheng 已提交
149
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
150 151 152 153 154 155 156 157
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
158
  pIter->flag = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
159

H
Hongze Cheng 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
  pSttDIter->pReader = pReader;
  pSttDIter->iStt = iStt;
  pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pSttDIter->aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;

  code = tBlockDataCreate(&pSttDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
177 178
  pSttDIter->iSttBlk = -1;
  pSttDIter->iRow = -1;
H
Hongze Cheng 已提交
179

H
Hongze Cheng 已提交
180 181
  code = tsdbDataIterNext(pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
182 183 184

_exit:
  if (code) {
H
Hongze Cheng 已提交
185
  _clear_exit:
H
Hongze Cheng 已提交
186
    *ppIter = NULL;
H
Hongze Cheng 已提交
187 188 189 190 191
    if (pIter) {
      tBlockDataDestroy(&pSttDIter->bData, 1);
      taosArrayDestroy(pSttDIter->aSttBlk);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
192 193 194
  } else {
    *ppIter = pIter;
  }
H
Hongze Cheng 已提交
195 196 197 198 199
  return code;
}

static void tsdbDataIterClose(STsdbDataIter *pIter) {
  // TODO
H
Hongze Cheng 已提交
200
  ASSERT(0);
H
Hongze Cheng 已提交
201 202 203 204 205
}

static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235

  if (pIter->flag & TSDB_ITER_TYPE_MEM) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_STT) {
    SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;

    pSttDIter->iRow++;
    if (pSttDIter->iRow < pSttDIter->bData.nRow) {
      ASSERT(0);
    } else {
      pSttDIter->iSttBlk++;
      if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
        code = tsdbReadSttBlock(pSttDIter->pReader, pSttDIter->iStt,
                                taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
        TSDB_CHECK_CODE(code, lino, _exit);

        pSttDIter->iRow = 0;
      } else {
        // code = TSDB_CODE_TDB_NO_DATA;
        // goto _exit;
      }
    }
  } else {
    ASSERT(0);
  }

H
Hongze Cheng 已提交
236 237 238
_exit:
  return code;
}
H
Hongze Cheng 已提交
239 240

// COMPACT =========================
H
Hongze Cheng 已提交
241
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
242 243 244
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
245 246 247 248 249 250 251
  pCompactor->pTsdb = pTsdb;

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  pCompactor->fid = INT32_MIN;

H
Hongze Cheng 已提交
252 253 254 255 256 257 258
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
259
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
260 261 262
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
263 264
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
265 266 267 268 269 270 271 272 273
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
274
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
275 276 277
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
278 279
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
289
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
290 291 292
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
293
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
294

H
Hongze Cheng 已提交
295 296 297
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
298 299 300 301 302 303 304
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
305
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
306
  int32_t code = 0;
H
Hongze Cheng 已提交
307 308
  int32_t lino = 0;

H
Hongze Cheng 已提交
309 310
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
311 312 313 314 315 316 317
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
318 319 320 321 322 323 324 325
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) {
  int32_t code = 0;
  int32_t lino = 0;
  // TODO
_exit:
  return code;
}

H
Hongze Cheng 已提交
326
static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
327 328 329
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
330
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
331

H
Hongze Cheng 已提交
332 333 334 335
  // next compact file
  pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
                                                       tDFileSetCmprFn, TD_GT);
  if (pCompactor->pDFileSet == NULL) goto _exit;
H
Hongze Cheng 已提交
336

H
Hongze Cheng 已提交
337 338 339
  pCompactor->fid = pCompactor->pDFileSet->fid;

  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
H
Hongze Cheng 已提交
340 341
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
342 343
  // open iters
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
344

H
Hongze Cheng 已提交
345 346
  pCompactor->iterList = NULL;
  tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
347

H
Hongze Cheng 已提交
348 349 350 351 352 353 354 355 356 357 358
  code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pIter) {
    pIter->next = pCompactor->iterList;
    pCompactor->iterList = pIter;
    tRBTreePut(&pCompactor->rtree, &pIter->n);
  }

  for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
    code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
H
Hongze Cheng 已提交
359 360
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
    if (pIter) {
      pIter->next = pCompactor->iterList;
      pCompactor->iterList = pIter;
      tRBTreePut(&pCompactor->rtree, &pIter->n);
    }
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
  return code;
}
H
Hongze Cheng 已提交
376

H
Hongze Cheng 已提交
377 378
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
379

H
Hongze Cheng 已提交
380 381 382 383 384
  for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
    STsdbDataIter *pIterNext = pIter->next;
    tsdbDataIterClose(pIter);
    pIter = pIterNext;
  }
H
Hongze Cheng 已提交
385

H
Hongze Cheng 已提交
386 387
  // TODO
  ASSERT(0);
H
Hongze Cheng 已提交
388

H
Hongze Cheng 已提交
389 390 391
_exit:
  tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
  int32_t code = 0;
  int32_t lino = 0;

  // Check if can do compact (TODO)

  // Do compact
  STsdbCompactor compactor = {0};

  code = tsdbBeginCompact(pTsdb, &compactor);
  TSDB_CHECK_CODE(code, lino, _exit);

  while (true) {
    code = tsdbOpenCompactor(&compactor);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (compactor.pDFileSet == NULL) break;
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411 412
    // loop to merge row by row
    TSDBROW *pRow = NULL;
H
Hongze Cheng 已提交
413
    int64_t  nRow = 0;
H
Hongze Cheng 已提交
414 415
    for (;;) {
      code = tsdbCompactNextRow(&compactor, &pRow);
H
Hongze Cheng 已提交
416
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
417 418

      if (pRow == NULL) break;
H
Hongze Cheng 已提交
419

H
Hongze Cheng 已提交
420 421
      nRow++;

H
Hongze Cheng 已提交
422 423 424 425 426 427 428
      // code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0);
      // TSDB_CHECK_CODE(code, lino, _exit);

      // if (compactor.bData.nRows >= TSDB_MAX_ROWS_PER_BLOCK) {
      //   code = tsdbFlushBlock(&compactor);
      //   TSDB_CHECK_CODE(code, lino, _exit);
      // }
H
Hongze Cheng 已提交
429
    }
H
Hongze Cheng 已提交
430 431

    tsdbCloseCompactor(&compactor);
H
Hongze Cheng 已提交
432 433 434 435 436
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
437
    tsdbAbortCompact(&compactor);
H
Hongze Cheng 已提交
438
  } else {
H
Hongze Cheng 已提交
439
    tsdbCommitCompact(&compactor);
H
Hongze Cheng 已提交
440
  }
H
Hongze Cheng 已提交
441 442
  return code;
}