tsdbCompact.c 23.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
#define TSDB_ITER_TYPE_STT 0x2

H
Hongze Cheng 已提交
22 23
typedef struct {
} SMemDIter;
H
Hongze Cheng 已提交
24 25

typedef struct {
H
Hongze Cheng 已提交
26 27 28 29 30 31 32
  SDataFReader *pReader;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SBlockData    bData;
  int32_t       iBlockIdx;
  int32_t       iDataBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
33 34 35
} SDataDIter;

typedef struct {
H
Hongze Cheng 已提交
36 37 38 39 40 41
  SDataFReader *pReader;
  int32_t       iStt;
  SArray       *aSttBlk;  // SArray<SSttBlk>
  SBlockData    bData;
  int32_t       iSttBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
42 43
} SSttDIter;

H
Hongze Cheng 已提交
44 45 46
typedef struct STsdbDataIter {
  struct STsdbDataIter *next;

H
Hongze Cheng 已提交
47 48 49 50
  int32_t     flag;
  SRowInfo    rowInfo;
  SRBTreeNode n;
  char        handle[];
H
Hongze Cheng 已提交
51 52
} STsdbDataIter;

H
Hongze Cheng 已提交
53 54
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))

H
Hongze Cheng 已提交
55
typedef struct {
H
Hongze Cheng 已提交
56 57
  STsdb     *pTsdb;
  int64_t    cid;
H
Hongze Cheng 已提交
58
  int8_t     cmprAlg;
H
Hongze Cheng 已提交
59 60 61 62 63 64 65
  int32_t    maxRows;
  int32_t    minRows;
  STsdbFS    fs;
  int32_t    fid;
  SDFileSet *pDFileSet;

  // Reader
H
Hongze Cheng 已提交
66 67 68
  SDataFReader  *pReader;
  STsdbDataIter *iterList;  // list of iterators
  SRBTree        rtree;
H
Hongze Cheng 已提交
69
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
70
  SBlockData     bData;
H
Hongze Cheng 已提交
71
  SSkmInfo       tbSkm;
H
Hongze Cheng 已提交
72 73 74 75

  // Writer
  SDataFWriter *pWriter;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
H
Hongze Cheng 已提交
76 77 78
  TABLEID       tableId;
  SMapData      mDataBlk;  // SMapData<SDataBlk>
  SArray       *aSttBlk;   // SArray<SSttBlk>
H
Hongze Cheng 已提交
79 80
} STsdbCompactor;

H
Hongze Cheng 已提交
81 82
#define TSDB_FLG_DEEP_COMPACT 0x1

H
Hongze Cheng 已提交
83
// ITER =========================
H
Hongze Cheng 已提交
84
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId);
H
Hongze Cheng 已提交
85

H
Hongze Cheng 已提交
86 87 88 89 90 91 92
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
  const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));

  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

H
Hongze Cheng 已提交
93
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
94 95
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
96 97 98 99 100 101 102

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
103
  // TODO
H
Hongze Cheng 已提交
104

H
Hongze Cheng 已提交
105
_exit:
H
Hongze Cheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
  if (NULL == pIter) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
123
  pIter->flag = TSDB_ITER_TYPE_DAT;
H
Hongze Cheng 已提交
124

H
Hongze Cheng 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137
  SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
  pDataDIter->pReader = pReader;
  pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pDataDIter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;

H
Hongze Cheng 已提交
138
  // TODO
H
Hongze Cheng 已提交
139 140 141
  code = tBlockDataCreate(&pDataDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
142
  pDataDIter->iBlockIdx = -1;
H
Hongze Cheng 已提交
143 144 145
  pDataDIter->iDataBlk = 0;
  pDataDIter->iRow = 0;

H
Hongze Cheng 已提交
146
  code = tsdbDataIterNext(pIter, NULL);
H
Hongze Cheng 已提交
147
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
148 149 150

_exit:
  if (code) {
H
Hongze Cheng 已提交
151
  _clear_exit:
H
Hongze Cheng 已提交
152
    *ppIter = NULL;
H
Hongze Cheng 已提交
153
    if (pIter) {
H
Hongze Cheng 已提交
154
      tBlockDataDestroy(&pDataDIter->bData);
H
Hongze Cheng 已提交
155 156 157 158
      tMapDataClear(&pDataDIter->mDataBlk);
      taosArrayDestroy(pDataDIter->aBlockIdx);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
159 160 161 162 163 164
  } else {
    *ppIter = pIter;
  }
  return code;
}

H
Hongze Cheng 已提交
165
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
166 167 168 169 170 171 172 173
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
174
  pIter->flag = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
175

H
Hongze Cheng 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
  pSttDIter->pReader = pReader;
  pSttDIter->iStt = iStt;
  pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pSttDIter->aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;

  code = tBlockDataCreate(&pSttDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
193 194
  pSttDIter->iSttBlk = -1;
  pSttDIter->iRow = -1;
H
Hongze Cheng 已提交
195

H
Hongze Cheng 已提交
196
  code = tsdbDataIterNext(pIter, NULL);
H
Hongze Cheng 已提交
197
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
198 199 200

_exit:
  if (code) {
H
Hongze Cheng 已提交
201
  _clear_exit:
H
Hongze Cheng 已提交
202
    *ppIter = NULL;
H
Hongze Cheng 已提交
203
    if (pIter) {
H
Hongze Cheng 已提交
204
      tBlockDataDestroy(&pSttDIter->bData);
H
Hongze Cheng 已提交
205 206 207
      taosArrayDestroy(pSttDIter->aSttBlk);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
208 209 210
  } else {
    *ppIter = pIter;
  }
H
Hongze Cheng 已提交
211 212 213 214
  return code;
}

static void tsdbDataIterClose(STsdbDataIter *pIter) {
H
Hongze Cheng 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
  if (pIter == NULL) return;

  if (pIter->flag & TSDB_ITER_TYPE_MEM) {
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_STT) {
    SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;

    tBlockDataDestroy(&pSttDIter->bData);
    taosArrayDestroy(pSttDIter->aSttBlk);
    tsdbDataFReaderClose(&pSttDIter->pReader);
  } else {
    ASSERT(0);
  }

  taosMemoryFree(pIter);
H
Hongze Cheng 已提交
232 233
}

H
Hongze Cheng 已提交
234
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) {
H
Hongze Cheng 已提交
235 236
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
237 238 239 240 241 242 243 244 245 246

  if (pIter->flag & TSDB_ITER_TYPE_MEM) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_STT) {
    SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;

H
Hongze Cheng 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    for (;;) {
      if (++pSttDIter->iRow >= pSttDIter->bData.nRow) {
        for (;;) {
          if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
            SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk);

            // check exclusion
            if (pExcludeTableId) {
              if (pExcludeTableId->uid) {  // exclude (suid, uid)
                if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue;
              } else {  // exclude (suid, *)
                if (pSttBlk->suid == pExcludeTableId->suid) continue;
              }
            }

            code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData);
            TSDB_CHECK_CODE(code, lino, _exit);

            pIter->rowInfo.suid = pSttBlk->suid;
            pSttDIter->iRow = 0;
            break;
          } else {
            // iter end, all set 0 and exit
            pIter->rowInfo.suid = 0;
            pIter->rowInfo.uid = 0;
            goto _exit;
          }
        }
      }

H
Hongze Cheng 已提交
277 278
      pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
      pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
H
Hongze Cheng 已提交
279 280 281 282 283 284 285 286

      // check exclusion
      if (pExcludeTableId) {
        if (pExcludeTableId->uid) {  // exclude (suid, uid)
          if (pIter->rowInfo.uid == pExcludeTableId->uid) continue;
        } else {  // exclude (suid, *)
          if (pIter->rowInfo.suid == pExcludeTableId->suid) continue;
        }
H
Hongze Cheng 已提交
287
      }
H
Hongze Cheng 已提交
288 289

      break;
H
Hongze Cheng 已提交
290 291 292 293 294
    }
  } else {
    ASSERT(0);
  }

H
Hongze Cheng 已提交
295 296 297
_exit:
  return code;
}
H
Hongze Cheng 已提交
298 299

// COMPACT =========================
H
Hongze Cheng 已提交
300
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
301 302 303
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
304
  pCompactor->pTsdb = pTsdb;
H
Hongze Cheng 已提交
305 306
  pCompactor->cid = 0;  // TODO
  pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
H
Hongze Cheng 已提交
307 308
  pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
H
Hongze Cheng 已提交
309 310 311 312 313 314

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  pCompactor->fid = INT32_MIN;

H
Hongze Cheng 已提交
315 316 317
  code = tBlockDataCreate(&pCompactor->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
318 319 320 321 322 323 324
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
325
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
326 327 328
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
329 330
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
331 332 333 334 335 336 337 338 339
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
340
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
341 342 343
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
344 345
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
346 347 348 349 350 351 352 353 354
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
355
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
356 357 358
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
359
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
360

H
Hongze Cheng 已提交
361 362 363
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
364 365 366 367 368 369 370
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
371
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
372
  int32_t code = 0;
H
Hongze Cheng 已提交
373 374
  int32_t lino = 0;

H
Hongze Cheng 已提交
375 376
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
377 378 379 380 381 382 383
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
384
static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) {
H
Hongze Cheng 已提交
385 386
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
387

H
Hongze Cheng 已提交
388 389 390 391
  for (;;) {
    if (pCompactor->pIter) {
      code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
392

H
Hongze Cheng 已提交
393 394 395 396 397 398
      if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
        pCompactor->pIter = NULL;
      } else {
        SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
        if (pNode) {
          STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
H
Hongze Cheng 已提交
399

H
Hongze Cheng 已提交
400 401
          int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
          ASSERT(c);
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404 405 406
          if (c > 0) {
            tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
            pCompactor->pIter = NULL;
          }
H
Hongze Cheng 已提交
407 408 409
        }
      }
    }
H
Hongze Cheng 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427

    if (pCompactor->pIter == NULL) {
      SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
      if (pNode) {
        pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
        tRBTreeDrop(&pCompactor->rtree, pNode);

        if (pExcludeTableId) {
          if (pExcludeTableId->uid) {
            if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue;
          } else {
            if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue;
          }
        }
      }
    }

    break;
H
Hongze Cheng 已提交
428 429
  }

H
Hongze Cheng 已提交
430 431 432 433 434 435 436 437
_exit:
  return code;
}

static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
438 439 440
  TABLEID  excludeTableId;
  TABLEID *pExcludeTableId = NULL;

H
Hongze Cheng 已提交
441
  for (;;) {
H
Hongze Cheng 已提交
442
    code = tsdbCompactNextRowImpl(pCompactor, pExcludeTableId);
H
Hongze Cheng 已提交
443 444 445 446
    TSDB_CHECK_CODE(code, lino, _exit);

    // check if the table of the row exists
    if (pCompactor->pIter) {
H
Hongze Cheng 已提交
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
      SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo;

      // Table exists, just break out
      if (pRowInfo->uid == pCompactor->tbSkm.uid) break;

      SMetaInfo info;
      if (pRowInfo->suid) {  // child table

        // check if super table exists
        if (pRowInfo->suid != pCompactor->tbSkm.suid) {
          if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
            excludeTableId.suid = pRowInfo->suid;
            excludeTableId.uid = 0;
            pExcludeTableId = &excludeTableId;
            continue;
          }

          // super table exists
          pCompactor->tbSkm.suid = pRowInfo->suid;
          pCompactor->tbSkm.uid = 0;
          tDestroyTSchema(pCompactor->tbSkm.pTSchema);
          pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
          if (pCompactor->tbSkm.pTSchema == NULL) {
H
Hongze Cheng 已提交
470 471 472
            code = TSDB_CODE_OUT_OF_MEMORY;
            TSDB_CHECK_CODE(code, lino, _exit);
          }
H
Hongze Cheng 已提交
473
        }
H
Hongze Cheng 已提交
474

H
Hongze Cheng 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
        // check if table exists
        if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
          excludeTableId.suid = pRowInfo->suid;
          excludeTableId.uid = pRowInfo->uid;
          pExcludeTableId = &excludeTableId;
          continue;
        }

        // table exists
        pCompactor->tbSkm.uid = pRowInfo->uid;
      } else {  // normal table
        // check if table exists
        if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) {
          excludeTableId.suid = pRowInfo->suid;
          excludeTableId.uid = pRowInfo->uid;
          pExcludeTableId = &excludeTableId;
          continue;
        }
H
Hongze Cheng 已提交
493

H
Hongze Cheng 已提交
494 495 496 497 498 499 500 501 502
        // table exists
        pCompactor->tbSkm.suid = pRowInfo->suid;
        pCompactor->tbSkm.uid = pRowInfo->uid;
        tDestroyTSchema(pCompactor->tbSkm.pTSchema);

        pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1);
        if (pCompactor->tbSkm.pTSchema == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
503 504
        }
      }
H
Hongze Cheng 已提交
505 506

      break;
H
Hongze Cheng 已提交
507 508 509
    } else {
      // iter end, just break out
      break;
H
Hongze Cheng 已提交
510 511 512 513
    }
  }

_exit:
H
Hongze Cheng 已提交
514 515 516 517
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  }
H
Hongze Cheng 已提交
518 519 520
  return code;
}

H
Hongze Cheng 已提交
521
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
522 523 524 525 526 527 528 529 530
  int32_t code = 0;
  int32_t lino = 0;

  if (pCompactor->pIter == NULL) {
    code = tsdbCompactNextRow(pCompactor);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (pCompactor->pIter) {
H
Hongze Cheng 已提交
531 532
    ASSERT(pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid);
    ASSERT(pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid);
H
Hongze Cheng 已提交
533
    *ppRowInfo = &pCompactor->pIter->rowInfo;
H
Hongze Cheng 已提交
534
    *ppTSchema = pCompactor->tbSkm.pTSchema;
H
Hongze Cheng 已提交
535
  } else {
H
Hongze Cheng 已提交
536 537
    *ppRowInfo = NULL;
    *ppTSchema = NULL;
H
Hongze Cheng 已提交
538 539
  }

H
Hongze Cheng 已提交
540 541 542 543
_exit:
  return code;
}

H
Hongze Cheng 已提交
544
static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
545 546 547
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
548
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
549

H
Hongze Cheng 已提交
550 551 552 553
  // next compact file
  pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
                                                       tDFileSetCmprFn, TD_GT);
  if (pCompactor->pDFileSet == NULL) goto _exit;
H
Hongze Cheng 已提交
554

H
Hongze Cheng 已提交
555 556
  pCompactor->fid = pCompactor->pDFileSet->fid;

H
Hongze Cheng 已提交
557
  // reader
H
Hongze Cheng 已提交
558
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
H
Hongze Cheng 已提交
559 560
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
561 562
  // open iters
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
563

H
Hongze Cheng 已提交
564 565
  pCompactor->iterList = NULL;
  tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
566

H
Hongze Cheng 已提交
567 568 569 570 571 572 573 574 575 576 577
  code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pIter) {
    pIter->next = pCompactor->iterList;
    pCompactor->iterList = pIter;
    tRBTreePut(&pCompactor->rtree, &pIter->n);
  }

  for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
    code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
H
Hongze Cheng 已提交
578 579
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
580 581 582 583 584 585
    if (pIter) {
      pIter->next = pCompactor->iterList;
      pCompactor->iterList = pIter;
      tRBTreePut(&pCompactor->rtree, &pIter->n);
    }
  }
H
Hongze Cheng 已提交
586
  pCompactor->pIter = NULL;
H
Hongze Cheng 已提交
587
  tBlockDataReset(&pCompactor->bData);
H
Hongze Cheng 已提交
588

H
Hongze Cheng 已提交
589 590 591 592 593 594 595 596 597
  // writer
  SDFileSet wSet = {.diskId = (SDiskID){0},  // TODO
                    .fid = pCompactor->pDFileSet->fid,
                    .pHeadF = &(SHeadFile){.commitID = pCompactor->cid},
                    .pDataF = &(SDataFile){.commitID = pCompactor->cid},
                    .pSmaF = &(SSmaFile){.commitID = pCompactor->cid},
                    .nSttF = 1,
                    .aSttF = {&(SSttFile){.commitID = pCompactor->cid}}};
  code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, &wSet);
H
Hongze Cheng 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pCompactor->aBlockIdx == NULL) {
    pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
    if (pCompactor->aBlockIdx == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  } else {
    taosArrayClear(pCompactor->aBlockIdx);
  }

  tMapDataReset(&pCompactor->mDataBlk);

  if (pCompactor->aSttBlk == NULL) {
    pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
    if (pCompactor->aSttBlk == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
  } else {
    taosArrayClear(pCompactor->aSttBlk);
  }

H
Hongze Cheng 已提交
622 623 624 625 626 627 628 629
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
  return code;
}
H
Hongze Cheng 已提交
630

H
Hongze Cheng 已提交
631 632 633 634 635 636
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
  for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
    STsdbDataIter *pIterNext = pIter->next;
    tsdbDataIterClose(pIter);
    pIter = pIterNext;
  }
H
Hongze Cheng 已提交
637

H
Hongze Cheng 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
  tDestroyTSchema(pCompactor->tbSkm.pTSchema);
  pCompactor->tbSkm.pTSchema = NULL;

  tsdbDataFReaderClose(&pCompactor->pReader);
}

extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
static int32_t tsdbCompactWriteBlockData(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

  SBlockData *pBData = &pCompactor->bData;

  if (pBData->nRow == 0) goto _exit;

  if (pBData->uid && pBData->nRow >= pCompactor->minRows) {  // write to .data file
    code = tsdbWriteDataBlock(pCompactor->pWriter, pBData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);

    pCompactor->tableId.suid = pBData->suid;
    pCompactor->tableId.uid = pBData->uid;
  } else {  // write to .stt file
    code = tsdbWriteSttBlock(pCompactor->pWriter, pBData, pCompactor->aSttBlk, pCompactor->cmprAlg);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  tBlockDataClear(&pCompactor->bData);
H
Hongze Cheng 已提交
666

H
Hongze Cheng 已提交
667
_exit:
H
Hongze Cheng 已提交
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  }
  return code;
}

static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) {
  int32_t code = 0;
  int32_t lino = 0;

  if (pCompactor->mDataBlk.nItem == 0) goto _exit;

  SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
  if (pBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  pBlockIdx->suid = pCompactor->tableId.suid;
  pBlockIdx->uid = pCompactor->tableId.uid;

  code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  tMapDataReset(&pCompactor->mDataBlk);
  pCompactor->tableId.suid = 0;
  pCompactor->tableId.uid = 0;

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
              tstrerror(code));
  }
  return code;
H
Hongze Cheng 已提交
703
}
H
Hongze Cheng 已提交
704

H
Hongze Cheng 已提交
705 706 707 708 709 710 711
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
  int32_t code = 0;
  int32_t lino = 0;

  // Check if can do compact (TODO)

  // Do compact
H
Hongze Cheng 已提交
712
  STsdbCompactor *pCompactor = &(STsdbCompactor){0};
H
Hongze Cheng 已提交
713

H
Hongze Cheng 已提交
714
  code = tsdbBeginCompact(pTsdb, pCompactor);
H
Hongze Cheng 已提交
715 716 717
  TSDB_CHECK_CODE(code, lino, _exit);

  while (true) {
H
Hongze Cheng 已提交
718
    code = tsdbOpenCompactor(pCompactor);
H
Hongze Cheng 已提交
719 720
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
721
    if (pCompactor->pDFileSet == NULL) break;
H
Hongze Cheng 已提交
722

H
Hongze Cheng 已提交
723
    // loop to merge row by row
H
Hongze Cheng 已提交
724 725 726
    SRowInfo *pRowInfo = NULL;
    STSchema *pTSchema = NULL;
    int64_t   nRow = 0;
H
Hongze Cheng 已提交
727
    for (;;) {
H
Hongze Cheng 已提交
728
      code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
H
Hongze Cheng 已提交
729
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
730

H
Hongze Cheng 已提交
731
      if (pRowInfo == NULL) break;
H
Hongze Cheng 已提交
732

H
Hongze Cheng 已提交
733 734
      nRow++;

H
Hongze Cheng 已提交
735 736 737 738 739
      if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {  // init the block data if not initialized yet
        code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
                              NULL, 0);
        TSDB_CHECK_CODE(code, lino, _exit);
      } else {
H
Hongze Cheng 已提交
740 741 742 743 744 745
        if (pCompactor->bData.suid != pRowInfo->suid) {  // different super table
          code = tsdbCompactWriteBlockData(pCompactor);
          TSDB_CHECK_CODE(code, lino, _exit);

          code = tsdbCompactWriteDataBlk(pCompactor);
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
746 747 748 749

          code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
                                NULL, 0);
          TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
750 751 752 753 754
        } else if (pCompactor->bData.uid != pRowInfo->uid) {  // different table
          if (pRowInfo->suid) {
            if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) {
              code = tsdbCompactWriteBlockData(pCompactor);
              TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
755
            }
H
Hongze Cheng 已提交
756 757 758 759 760 761 762
          } else {
            // different normal table
            code = tsdbCompactWriteBlockData(pCompactor);
            TSDB_CHECK_CODE(code, lino, _exit);

            code = tsdbCompactWriteDataBlk(pCompactor);
            TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
763 764 765 766 767 768 769 770 771

            code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid},
                                  pTSchema, NULL, 0);
            TSDB_CHECK_CODE(code, lino, _exit);
          }
        }
      }

      // append row to block data
H
Hongze Cheng 已提交
772
      code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
H
Hongze Cheng 已提交
773 774 775 776
      TSDB_CHECK_CODE(code, lino, _exit);

      // check if block data is full
      if (pCompactor->bData.nRow >= pCompactor->maxRows) {
H
Hongze Cheng 已提交
777 778
        code = tsdbCompactWriteBlockData(pCompactor);
        TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
779
      }
H
Hongze Cheng 已提交
780

H
Hongze Cheng 已提交
781 782 783
      // iterate to next row
      code = tsdbCompactNextRow(pCompactor);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
784
    }
H
Hongze Cheng 已提交
785

H
Hongze Cheng 已提交
786 787 788 789 790 791 792 793
    code = tsdbCompactWriteBlockData(pCompactor);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbCompactWriteDataBlk(pCompactor);
    TSDB_CHECK_CODE(code, lino, _exit);

    code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
794 795 796 797

    code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
798 799
    code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
800

H
Hongze Cheng 已提交
801 802
    code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
    TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
803

H
Hongze Cheng 已提交
804
    code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
H
Hongze Cheng 已提交
805 806
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
807
    tsdbCloseCompactor(pCompactor);
H
Hongze Cheng 已提交
808 809 810 811 812
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
813
    tsdbAbortCompact(pCompactor);
H
Hongze Cheng 已提交
814
  } else {
H
Hongze Cheng 已提交
815
    tsdbCommitCompact(pCompactor);
H
Hongze Cheng 已提交
816
  }
H
Hongze Cheng 已提交
817 818
  return code;
}