tsdbCompact.c 20.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
#define TSDB_ITER_TYPE_STT 0x2

H
Hongze Cheng 已提交
22 23
typedef struct {
} SMemDIter;
H
Hongze Cheng 已提交
24 25

typedef struct {
H
Hongze Cheng 已提交
26 27 28 29 30 31 32
  SDataFReader *pReader;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SBlockData    bData;
  int32_t       iBlockIdx;
  int32_t       iDataBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
33 34 35
} SDataDIter;

typedef struct {
H
Hongze Cheng 已提交
36 37 38 39 40 41
  SDataFReader *pReader;
  int32_t       iStt;
  SArray       *aSttBlk;  // SArray<SSttBlk>
  SBlockData    bData;
  int32_t       iSttBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
42 43
} SSttDIter;

H
Hongze Cheng 已提交
44 45 46
typedef struct STsdbDataIter {
  struct STsdbDataIter *next;

H
Hongze Cheng 已提交
47 48 49 50
  int32_t     flag;
  SRowInfo    rowInfo;
  SRBTreeNode n;
  char        handle[];
H
Hongze Cheng 已提交
51 52
} STsdbDataIter;

H
Hongze Cheng 已提交
53 54
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))

H
Hongze Cheng 已提交
55
typedef struct {
H
Hongze Cheng 已提交
56 57 58 59 60 61 62 63 64
  STsdb     *pTsdb;
  int64_t    cid;
  int32_t    maxRows;
  int32_t    minRows;
  STsdbFS    fs;
  int32_t    fid;
  SDFileSet *pDFileSet;

  // Reader
H
Hongze Cheng 已提交
65 66 67
  SDataFReader  *pReader;
  STsdbDataIter *iterList;  // list of iterators
  SRBTree        rtree;
H
Hongze Cheng 已提交
68
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
69
  SBlockData     bData;
H
Hongze Cheng 已提交
70
  SSkmInfo       tbSkm;
H
Hongze Cheng 已提交
71 72 73 74 75 76

  // Writer
  SDataFWriter *pWriter;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SArray       *aSttBlk;    // SArray<SSttBlk>
H
Hongze Cheng 已提交
77 78
} STsdbCompactor;

H
Hongze Cheng 已提交
79 80
#define TSDB_FLG_DEEP_COMPACT 0x1

H
Hongze Cheng 已提交
81
// ITER =========================
H
Hongze Cheng 已提交
82
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId);
H
Hongze Cheng 已提交
83

H
Hongze Cheng 已提交
84 85 86 87 88 89 90
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
  const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));

  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

H
Hongze Cheng 已提交
91
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
92 93
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
94 95 96 97 98 99 100

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
101
  // TODO
H
Hongze Cheng 已提交
102

H
Hongze Cheng 已提交
103
_exit:
H
Hongze Cheng 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
  if (NULL == pIter) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
121
  pIter->flag = TSDB_ITER_TYPE_DAT;
H
Hongze Cheng 已提交
122

H
Hongze Cheng 已提交
123 124 125 126 127 128 129 130 131 132 133 134 135
  SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
  pDataDIter->pReader = pReader;
  pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pDataDIter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;

H
Hongze Cheng 已提交
136
  // TODO
H
Hongze Cheng 已提交
137 138 139
  code = tBlockDataCreate(&pDataDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
140
  pDataDIter->iBlockIdx = -1;
H
Hongze Cheng 已提交
141 142 143
  pDataDIter->iDataBlk = 0;
  pDataDIter->iRow = 0;

H
Hongze Cheng 已提交
144
  code = tsdbDataIterNext(pIter, NULL);
H
Hongze Cheng 已提交
145
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
146 147 148

_exit:
  if (code) {
H
Hongze Cheng 已提交
149
  _clear_exit:
H
Hongze Cheng 已提交
150
    *ppIter = NULL;
H
Hongze Cheng 已提交
151
    if (pIter) {
H
Hongze Cheng 已提交
152
      tBlockDataDestroy(&pDataDIter->bData);
H
Hongze Cheng 已提交
153 154 155 156
      tMapDataClear(&pDataDIter->mDataBlk);
      taosArrayDestroy(pDataDIter->aBlockIdx);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
157 158 159 160 161 162
  } else {
    *ppIter = pIter;
  }
  return code;
}

H
Hongze Cheng 已提交
163
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
172
  pIter->flag = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
173

H
Hongze Cheng 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
  SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
  pSttDIter->pReader = pReader;
  pSttDIter->iStt = iStt;
  pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pSttDIter->aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;

  code = tBlockDataCreate(&pSttDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
191 192
  pSttDIter->iSttBlk = -1;
  pSttDIter->iRow = -1;
H
Hongze Cheng 已提交
193

H
Hongze Cheng 已提交
194
  code = tsdbDataIterNext(pIter, NULL);
H
Hongze Cheng 已提交
195
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
196 197 198

_exit:
  if (code) {
H
Hongze Cheng 已提交
199
  _clear_exit:
H
Hongze Cheng 已提交
200
    *ppIter = NULL;
H
Hongze Cheng 已提交
201
    if (pIter) {
H
Hongze Cheng 已提交
202
      tBlockDataDestroy(&pSttDIter->bData);
H
Hongze Cheng 已提交
203 204 205
      taosArrayDestroy(pSttDIter->aSttBlk);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
206 207 208
  } else {
    *ppIter = pIter;
  }
H
Hongze Cheng 已提交
209 210 211 212 213
  return code;
}

static void tsdbDataIterClose(STsdbDataIter *pIter) {
  // TODO
H
Hongze Cheng 已提交
214
  ASSERT(0);
H
Hongze Cheng 已提交
215 216
}

H
Hongze Cheng 已提交
217
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) {
H
Hongze Cheng 已提交
218 219
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
220 221 222 223 224 225 226 227 228 229

  if (pIter->flag & TSDB_ITER_TYPE_MEM) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_STT) {
    SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;

H
Hongze Cheng 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
    for (;;) {
      if (++pSttDIter->iRow >= pSttDIter->bData.nRow) {
        for (;;) {
          if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
            SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk);

            // check exclusion
            if (pExcludeTableId) {
              if (pExcludeTableId->uid) {  // exclude (suid, uid)
                if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue;
              } else {  // exclude (suid, *)
                if (pSttBlk->suid == pExcludeTableId->suid) continue;
              }
            }

            code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData);
            TSDB_CHECK_CODE(code, lino, _exit);

            pIter->rowInfo.suid = pSttBlk->suid;
            pSttDIter->iRow = 0;
            break;
          } else {
            // iter end, all set 0 and exit
            pIter->rowInfo.suid = 0;
            pIter->rowInfo.uid = 0;
            goto _exit;
          }
        }
      }

H
Hongze Cheng 已提交
260 261
      pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
      pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
H
Hongze Cheng 已提交
262 263 264 265 266 267 268 269

      // check exclusion
      if (pExcludeTableId) {
        if (pExcludeTableId->uid) {  // exclude (suid, uid)
          if (pIter->rowInfo.uid == pExcludeTableId->uid) continue;
        } else {  // exclude (suid, *)
          if (pIter->rowInfo.suid == pExcludeTableId->suid) continue;
        }
H
Hongze Cheng 已提交
270
      }
H
Hongze Cheng 已提交
271 272

      break;
H
Hongze Cheng 已提交
273 274 275 276 277
    }
  } else {
    ASSERT(0);
  }

H
Hongze Cheng 已提交
278 279 280
_exit:
  return code;
}
H
Hongze Cheng 已提交
281 282

// COMPACT =========================
H
Hongze Cheng 已提交
283
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
284 285 286
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
287
  pCompactor->pTsdb = pTsdb;
H
Hongze Cheng 已提交
288 289 290
  // pCompactor->cid = 0; (TODO)
  pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
H
Hongze Cheng 已提交
291 292 293 294 295 296

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  pCompactor->fid = INT32_MIN;

H
Hongze Cheng 已提交
297 298 299
  code = tBlockDataCreate(&pCompactor->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
300 301 302 303 304 305 306
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
307
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
308 309 310
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
311 312
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
313 314 315 316 317 318 319 320 321
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
322
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
323 324 325
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
326 327
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
328 329 330 331 332 333 334 335 336
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
337
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
338 339 340
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
341
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
342

H
Hongze Cheng 已提交
343 344 345
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
346 347 348 349 350 351 352
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
353
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
354
  int32_t code = 0;
H
Hongze Cheng 已提交
355 356
  int32_t lino = 0;

H
Hongze Cheng 已提交
357 358
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
359 360 361 362 363 364 365
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
366
static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) {
H
Hongze Cheng 已提交
367 368
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
369

H
Hongze Cheng 已提交
370 371 372 373
  for (;;) {
    if (pCompactor->pIter) {
      code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
374

H
Hongze Cheng 已提交
375 376 377 378 379 380
      if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
        pCompactor->pIter = NULL;
      } else {
        SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
        if (pNode) {
          STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
H
Hongze Cheng 已提交
381

H
Hongze Cheng 已提交
382 383
          int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
          ASSERT(c);
H
Hongze Cheng 已提交
384

H
Hongze Cheng 已提交
385 386 387 388
          if (c > 0) {
            tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
            pCompactor->pIter = NULL;
          }
H
Hongze Cheng 已提交
389 390 391
        }
      }
    }
H
Hongze Cheng 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

    if (pCompactor->pIter == NULL) {
      SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
      if (pNode) {
        pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
        tRBTreeDrop(&pCompactor->rtree, pNode);

        if (pExcludeTableId) {
          if (pExcludeTableId->uid) {
            if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue;
          } else {
            if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue;
          }
        }
      }
    }

    break;
H
Hongze Cheng 已提交
410 411
  }

H
Hongze Cheng 已提交
412 413 414 415 416 417 418 419
_exit:
  return code;
}

static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
420 421 422
  TABLEID  excludeTableId;
  TABLEID *pExcludeTableId = NULL;

H
Hongze Cheng 已提交
423
  for (;;) {
H
Hongze Cheng 已提交
424
    code = tsdbCompactNextRowImpl(pCompactor, pExcludeTableId);
H
Hongze Cheng 已提交
425 426 427 428
    TSDB_CHECK_CODE(code, lino, _exit);

    // check if the table of the row exists
    if (pCompactor->pIter) {
H
Hongze Cheng 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
      SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo;

      // Table exists, just break out
      if (pRowInfo->uid == pCompactor->tbSkm.uid) break;

      SMetaInfo info;
      if (pRowInfo->suid) {  // child table

        // check if super table exists
        if (pRowInfo->suid != pCompactor->tbSkm.suid) {
          if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
            excludeTableId.suid = pRowInfo->suid;
            excludeTableId.uid = 0;
            pExcludeTableId = &excludeTableId;
            continue;
          }

          // super table exists
          pCompactor->tbSkm.suid = pRowInfo->suid;
          pCompactor->tbSkm.uid = 0;
          tDestroyTSchema(pCompactor->tbSkm.pTSchema);
          pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
          if (pCompactor->tbSkm.pTSchema == NULL) {
H
Hongze Cheng 已提交
452 453 454
            code = TSDB_CODE_OUT_OF_MEMORY;
            TSDB_CHECK_CODE(code, lino, _exit);
          }
H
Hongze Cheng 已提交
455
        }
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
        // check if table exists
        if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
          excludeTableId.suid = pRowInfo->suid;
          excludeTableId.uid = pRowInfo->uid;
          pExcludeTableId = &excludeTableId;
          continue;
        }

        // table exists
        pCompactor->tbSkm.uid = pRowInfo->uid;
      } else {  // normal table
        // check if table exists
        if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
          excludeTableId.suid = pRowInfo->suid;
          excludeTableId.uid = pRowInfo->uid;
          pExcludeTableId = &excludeTableId;
          continue;
        }
H
Hongze Cheng 已提交
475

H
Hongze Cheng 已提交
476 477 478 479 480 481 482 483 484
        // table exists
        pCompactor->tbSkm.suid = pRowInfo->suid;
        pCompactor->tbSkm.uid = pRowInfo->uid;
        tDestroyTSchema(pCompactor->tbSkm.pTSchema);

        pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
        if (pCompactor->tbSkm.pTSchema == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
485 486
        }
      }
H
Hongze Cheng 已提交
487 488

      break;
H
Hongze Cheng 已提交
489 490 491
    } else {
      // iter end, just break out
      break;
H
Hongze Cheng 已提交
492 493 494 495
    }
  }

_exit:
H
Hongze Cheng 已提交
496 497 498 499
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  }
H
Hongze Cheng 已提交
500 501 502
  return code;
}

H
Hongze Cheng 已提交
503
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
504 505 506 507 508 509 510 511 512
  int32_t code = 0;
  int32_t lino = 0;

  if (pCompactor->pIter == NULL) {
    code = tsdbCompactNextRow(pCompactor);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (pCompactor->pIter) {
H
Hongze Cheng 已提交
513 514
    ASSERT(pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid);
    ASSERT(pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid);
H
Hongze Cheng 已提交
515
    *ppRowInfo = &pCompactor->pIter->rowInfo;
H
Hongze Cheng 已提交
516
    *ppTSchema = pCompactor->tbSkm.pTSchema;
H
Hongze Cheng 已提交
517
  } else {
H
Hongze Cheng 已提交
518 519
    *ppRowInfo = NULL;
    *ppTSchema = NULL;
H
Hongze Cheng 已提交
520 521
  }

H
Hongze Cheng 已提交
522 523 524 525
_exit:
  return code;
}

H
Hongze Cheng 已提交
526
static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
527 528 529
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
530
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
531

H
Hongze Cheng 已提交
532 533 534 535
  // next compact file
  pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
                                                       tDFileSetCmprFn, TD_GT);
  if (pCompactor->pDFileSet == NULL) goto _exit;
H
Hongze Cheng 已提交
536

H
Hongze Cheng 已提交
537 538 539
  pCompactor->fid = pCompactor->pDFileSet->fid;

  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
H
Hongze Cheng 已提交
540 541
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
542 543
  // open iters
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
544

H
Hongze Cheng 已提交
545 546
  pCompactor->iterList = NULL;
  tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
547

H
Hongze Cheng 已提交
548 549 550 551 552 553 554 555 556 557 558
  code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pIter) {
    pIter->next = pCompactor->iterList;
    pCompactor->iterList = pIter;
    tRBTreePut(&pCompactor->rtree, &pIter->n);
  }

  for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
    code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
H
Hongze Cheng 已提交
559 560
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
561 562 563 564 565 566
    if (pIter) {
      pIter->next = pCompactor->iterList;
      pCompactor->iterList = pIter;
      tRBTreePut(&pCompactor->rtree, &pIter->n);
    }
  }
H
Hongze Cheng 已提交
567
  pCompactor->pIter = NULL;
H
Hongze Cheng 已提交
568
  tBlockDataReset(&pCompactor->bData);
H
Hongze Cheng 已提交
569

H
Hongze Cheng 已提交
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
  // open writers
  SDFileSet fSet = {0};  // TODO
  code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, NULL);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pCompactor->aBlockIdx == NULL) {
    pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
    if (pCompactor->aBlockIdx == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  } else {
    taosArrayClear(pCompactor->aBlockIdx);
  }

  tMapDataReset(&pCompactor->mDataBlk);

  if (pCompactor->aSttBlk == NULL) {
    pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pCompactor->aSttBlk == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  } else {
    taosArrayClear(pCompactor->aSttBlk);
  }

H
Hongze Cheng 已提交
597 598 599 600 601 602 603 604
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
  return code;
}
H
Hongze Cheng 已提交
605

H
Hongze Cheng 已提交
606 607
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
608

H
Hongze Cheng 已提交
609 610 611 612 613
  for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
    STsdbDataIter *pIterNext = pIter->next;
    tsdbDataIterClose(pIter);
    pIter = pIterNext;
  }
H
Hongze Cheng 已提交
614

H
Hongze Cheng 已提交
615 616
  // TODO
  ASSERT(0);
H
Hongze Cheng 已提交
617

H
Hongze Cheng 已提交
618 619 620
_exit:
  tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
H
Hongze Cheng 已提交
621

H
Hongze Cheng 已提交
622 623 624 625 626 627 628
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
  int32_t code = 0;
  int32_t lino = 0;

  // Check if can do compact (TODO)

  // Do compact
H
Hongze Cheng 已提交
629
  STsdbCompactor *pCompactor = &(STsdbCompactor){0};
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631
  code = tsdbBeginCompact(pTsdb, pCompactor);
H
Hongze Cheng 已提交
632 633 634
  TSDB_CHECK_CODE(code, lino, _exit);

  while (true) {
H
Hongze Cheng 已提交
635
    code = tsdbOpenCompactor(pCompactor);
H
Hongze Cheng 已提交
636 637
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
638
    if (pCompactor->pDFileSet == NULL) break;
H
Hongze Cheng 已提交
639

H
Hongze Cheng 已提交
640
    // loop to merge row by row
H
Hongze Cheng 已提交
641 642 643
    SRowInfo *pRowInfo = NULL;
    STSchema *pTSchema = NULL;
    int64_t   nRow = 0;
H
Hongze Cheng 已提交
644
    for (;;) {
H
Hongze Cheng 已提交
645
      code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
H
Hongze Cheng 已提交
646
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
647

H
Hongze Cheng 已提交
648
      if (pRowInfo == NULL) break;
H
Hongze Cheng 已提交
649

H
Hongze Cheng 已提交
650 651
      nRow++;

H
Hongze Cheng 已提交
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
      if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {  // init the block data if not initialized yet
        code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
                              NULL, 0);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
        if (pCompactor->bData.suid != pRowInfo->suid) {  // not same super table
          if (pCompactor->bData.nRow < pCompactor->minRows) {
            // TODO: write block data to .stt file, need to check if nRow is 0
            tBlockDataClear(&pCompactor->bData);
          } else {
            // TODO: write block data to .data file, need to check if nRow is 0
            tBlockDataClear(&pCompactor->bData);
          }

          code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
                                NULL, 0);
          TSDB_CHECK_CODE(code, lino, _exit);
        } else if (pCompactor->bData.uid != pRowInfo->uid) {
          if (pRowInfo->suid) {  // different child table
            if (pCompactor->bData.nRow > pCompactor->minRows) {
              // TODO
            }
          } else {  // different normal table
            if (pCompactor->bData.nRow < pCompactor->minRows) {
              // TODO: write data to .stt file, need to check if nRow is 0
              tBlockDataClear(&pCompactor->bData);
            } else {
              // TODO: write data to .data file, need to check if nRow is 0
              tBlockDataClear(&pCompactor->bData);
            }

            code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid},
                                  pTSchema, NULL, 0);
            TSDB_CHECK_CODE(code, lino, _exit);
          }
        }
      }

      // append row to block data
      code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
      TSDB_CHECK_CODE(code, lino, _exit);

      // check if block data is full
      if (pCompactor->bData.nRow >= pCompactor->maxRows) {
        tBlockDataClear(&pCompactor->bData);
      }
H
Hongze Cheng 已提交
698

H
Hongze Cheng 已提交
699 700 701
      // iterate to next row
      code = tsdbCompactNextRow(pCompactor);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
702
    }
H
Hongze Cheng 已提交
703

H
Hongze Cheng 已提交
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
    if (pCompactor->bData.nRow > 0) {
      // write again
    }

    code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCompactor->mDataBlk.nItem > 0) {
      SBlockIdx *pBlockIdx = taosArrayReserve(pCompactor->aBlockIdx, 1);
      if (pBlockIdx == NULL) {
        code = TSDB_CODE_OUT_OF_MEMORY;
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
      TSDB_CHECK_CODE(code, lino, _exit);
    }

    code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
725
    tsdbCloseCompactor(pCompactor);
H
Hongze Cheng 已提交
726 727 728 729 730
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
731
    tsdbAbortCompact(pCompactor);
H
Hongze Cheng 已提交
732
  } else {
H
Hongze Cheng 已提交
733
    tsdbCommitCompact(pCompactor);
H
Hongze Cheng 已提交
734
  }
H
Hongze Cheng 已提交
735 736
  return code;
}