tsdbCompact.c 14.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19 20 21
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
#define TSDB_ITER_TYPE_STT 0x2

H
Hongze Cheng 已提交
22 23
typedef struct {
} SMemDIter;
H
Hongze Cheng 已提交
24 25

typedef struct {
H
Hongze Cheng 已提交
26 27 28 29 30 31 32
  SDataFReader *pReader;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SBlockData    bData;
  int32_t       iBlockIdx;
  int32_t       iDataBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
33 34 35
} SDataDIter;

typedef struct {
H
Hongze Cheng 已提交
36 37 38 39 40 41
  SDataFReader *pReader;
  int32_t       iStt;
  SArray       *aSttBlk;  // SArray<SSttBlk>
  SBlockData    bData;
  int32_t       iSttBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
42 43
} SSttDIter;

H
Hongze Cheng 已提交
44 45 46
typedef struct STsdbDataIter {
  struct STsdbDataIter *next;

H
Hongze Cheng 已提交
47 48 49 50
  int32_t     flag;
  SRowInfo    rowInfo;
  SRBTreeNode n;
  char        handle[];
H
Hongze Cheng 已提交
51 52
} STsdbDataIter;

H
Hongze Cheng 已提交
53 54
#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n)))

H
Hongze Cheng 已提交
55
typedef struct {
H
Hongze Cheng 已提交
56 57
  STsdb         *pTsdb;
  int64_t        cid;
H
Hongze Cheng 已提交
58 59 60
  int32_t        maxRows;
  int32_t        minRows;
  STsdbFS        fs;
H
Hongze Cheng 已提交
61 62 63 64 65
  int32_t        fid;
  SDFileSet     *pDFileSet;
  SDataFReader  *pReader;
  STsdbDataIter *iterList;  // list of iterators
  SRBTree        rtree;
H
Hongze Cheng 已提交
66
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
67
  SBlockData     bData;
H
Hongze Cheng 已提交
68 69
} STsdbCompactor;

H
Hongze Cheng 已提交
70 71
#define TSDB_FLG_DEEP_COMPACT 0x1

H
Hongze Cheng 已提交
72
// ITER =========================
H
Hongze Cheng 已提交
73 74
static int32_t tsdbDataIterNext(STsdbDataIter *pIter);

H
Hongze Cheng 已提交
75 76 77 78 79 80 81
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
  const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
  const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));

  return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}

H
Hongze Cheng 已提交
82
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
83 84
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
85 86 87 88 89 90 91

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
92
  // TODO
H
Hongze Cheng 已提交
93

H
Hongze Cheng 已提交
94
_exit:
H
Hongze Cheng 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
  if (NULL == pIter) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
112
  pIter->flag = TSDB_ITER_TYPE_DAT;
H
Hongze Cheng 已提交
113

H
Hongze Cheng 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126
  SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
  pDataDIter->pReader = pReader;
  pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
  if (pDataDIter->aBlockIdx == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;

H
Hongze Cheng 已提交
127
  // TODO
H
Hongze Cheng 已提交
128 129 130
  code = tBlockDataCreate(&pDataDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
131
  pDataDIter->iBlockIdx = -1;
H
Hongze Cheng 已提交
132 133 134
  pDataDIter->iDataBlk = 0;
  pDataDIter->iRow = 0;

H
Hongze Cheng 已提交
135 136
  code = tsdbDataIterNext(pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
137 138 139

_exit:
  if (code) {
H
Hongze Cheng 已提交
140
  _clear_exit:
H
Hongze Cheng 已提交
141
    *ppIter = NULL;
H
Hongze Cheng 已提交
142 143 144 145 146 147
    if (pIter) {
      tBlockDataDestroy(&pDataDIter->bData, 1);
      tMapDataClear(&pDataDIter->mDataBlk);
      taosArrayDestroy(pDataDIter->aBlockIdx);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
148 149 150 151 152 153
  } else {
    *ppIter = pIter;
  }
  return code;
}

H
Hongze Cheng 已提交
154
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
155 156 157 158 159 160 161 162
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
163
  pIter->flag = TSDB_ITER_TYPE_STT;
H
Hongze Cheng 已提交
164

H
Hongze Cheng 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
  pSttDIter->pReader = pReader;
  pSttDIter->iStt = iStt;
  pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
  if (pSttDIter->aSttBlk == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;

  code = tBlockDataCreate(&pSttDIter->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
182 183
  pSttDIter->iSttBlk = -1;
  pSttDIter->iRow = -1;
H
Hongze Cheng 已提交
184

H
Hongze Cheng 已提交
185 186
  code = tsdbDataIterNext(pIter);
  TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
187 188 189

_exit:
  if (code) {
H
Hongze Cheng 已提交
190
  _clear_exit:
H
Hongze Cheng 已提交
191
    *ppIter = NULL;
H
Hongze Cheng 已提交
192 193 194 195 196
    if (pIter) {
      tBlockDataDestroy(&pSttDIter->bData, 1);
      taosArrayDestroy(pSttDIter->aSttBlk);
      taosMemoryFree(pIter);
    }
H
Hongze Cheng 已提交
197 198 199
  } else {
    *ppIter = pIter;
  }
H
Hongze Cheng 已提交
200 201 202 203 204
  return code;
}

static void tsdbDataIterClose(STsdbDataIter *pIter) {
  // TODO
H
Hongze Cheng 已提交
205
  ASSERT(0);
H
Hongze Cheng 已提交
206 207 208 209 210
}

static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
211 212 213 214 215 216 217 218 219 220 221 222

  if (pIter->flag & TSDB_ITER_TYPE_MEM) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
    // TODO
    ASSERT(0);
  } else if (pIter->flag & TSDB_ITER_TYPE_STT) {
    SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;

    pSttDIter->iRow++;
    if (pSttDIter->iRow < pSttDIter->bData.nRow) {
H
Hongze Cheng 已提交
223 224
      pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
      pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
H
Hongze Cheng 已提交
225 226 227
    } else {
      pSttDIter->iSttBlk++;
      if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
H
Hongze Cheng 已提交
228 229
        code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt,
                                  taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
H
Hongze Cheng 已提交
230 231 232
        TSDB_CHECK_CODE(code, lino, _exit);

        pSttDIter->iRow = 0;
H
Hongze Cheng 已提交
233 234 235
        pIter->rowInfo.suid = pSttDIter->bData.suid;
        pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow];
        pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow);
H
Hongze Cheng 已提交
236
      } else {
H
Hongze Cheng 已提交
237 238
        pIter->rowInfo.suid = 0;
        pIter->rowInfo.uid = 0;
H
Hongze Cheng 已提交
239 240 241 242 243 244
      }
    }
  } else {
    ASSERT(0);
  }

H
Hongze Cheng 已提交
245 246 247
_exit:
  return code;
}
H
Hongze Cheng 已提交
248 249

// COMPACT =========================
H
Hongze Cheng 已提交
250
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
251 252 253
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
254
  pCompactor->pTsdb = pTsdb;
H
Hongze Cheng 已提交
255 256 257
  // pCompactor->cid = 0; (TODO)
  pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
  pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
H
Hongze Cheng 已提交
258 259 260 261 262 263

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  pCompactor->fid = INT32_MIN;

H
Hongze Cheng 已提交
264 265 266
  code = tBlockDataCreate(&pCompactor->bData);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
267 268 269 270 271 272 273
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
274
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
275 276 277
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
278 279
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
280 281 282 283 284 285 286 287 288
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
289
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
290 291 292
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
293 294
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
295 296 297 298 299 300 301 302 303
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
304
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
305 306 307
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
308
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
309

H
Hongze Cheng 已提交
310 311 312
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
313 314 315 316 317 318 319
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
320
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
321
  int32_t code = 0;
H
Hongze Cheng 已提交
322 323
  int32_t lino = 0;

H
Hongze Cheng 已提交
324 325
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
326 327 328 329 330 331 332
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
333
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
334 335
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370

  if (pCompactor->pIter) {
    code = tsdbDataIterNext(pCompactor->pIter);
    TSDB_CHECK_CODE(code, lino, _exit);

    if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) {
      pCompactor->pIter = NULL;
    } else {
      SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
      if (pNode) {
        STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);

        int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo);
        ASSERT(c);

        if (c > 0) {
          tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n);
          pCompactor->pIter = NULL;
        }
      }
    }
  }

  if (pCompactor->pIter == NULL) {
    SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree);
    if (pNode) {
      pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode);
      tRBTreeDrop(&pCompactor->rtree, pNode);
    }
  }

_exit:
  return code;
}

H
Hongze Cheng 已提交
371
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) {
H
Hongze Cheng 已提交
372 373 374 375 376 377 378 379 380
  int32_t code = 0;
  int32_t lino = 0;

  if (pCompactor->pIter == NULL) {
    code = tsdbCompactNextRow(pCompactor);
    TSDB_CHECK_CODE(code, lino, _exit);
  }

  if (pCompactor->pIter) {
H
Hongze Cheng 已提交
381 382
    *ppRowInfo = &pCompactor->pIter->rowInfo;
    *ppTSchema = NULL;  // TODO
H
Hongze Cheng 已提交
383
  } else {
H
Hongze Cheng 已提交
384 385
    *ppRowInfo = NULL;
    *ppTSchema = NULL;
H
Hongze Cheng 已提交
386 387
  }

H
Hongze Cheng 已提交
388 389 390 391
_exit:
  return code;
}

H
Hongze Cheng 已提交
392
static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
393 394 395
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
396
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
397

H
Hongze Cheng 已提交
398 399 400 401
  // next compact file
  pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
                                                       tDFileSetCmprFn, TD_GT);
  if (pCompactor->pDFileSet == NULL) goto _exit;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404 405
  pCompactor->fid = pCompactor->pDFileSet->fid;

  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
H
Hongze Cheng 已提交
406 407
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
408 409
  // open iters
  STsdbDataIter *pIter;
H
Hongze Cheng 已提交
410

H
Hongze Cheng 已提交
411 412
  pCompactor->iterList = NULL;
  tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
H
Hongze Cheng 已提交
413

H
Hongze Cheng 已提交
414 415 416 417 418 419 420 421 422 423 424
  code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
  TSDB_CHECK_CODE(code, lino, _exit);

  if (pIter) {
    pIter->next = pCompactor->iterList;
    pCompactor->iterList = pIter;
    tRBTreePut(&pCompactor->rtree, &pIter->n);
  }

  for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
    code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
H
Hongze Cheng 已提交
425 426
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
427 428 429 430 431 432
    if (pIter) {
      pIter->next = pCompactor->iterList;
      pCompactor->iterList = pIter;
      tRBTreePut(&pCompactor->rtree, &pIter->n);
    }
  }
H
Hongze Cheng 已提交
433
  pCompactor->pIter = NULL;
H
Hongze Cheng 已提交
434
  tBlockDataReset(&pCompactor->bData);
H
Hongze Cheng 已提交
435 436 437 438 439 440 441 442 443

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  } else {
    tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
  }
  return code;
}
H
Hongze Cheng 已提交
444

H
Hongze Cheng 已提交
445 446
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
447

H
Hongze Cheng 已提交
448 449 450 451 452
  for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
    STsdbDataIter *pIterNext = pIter->next;
    tsdbDataIterClose(pIter);
    pIter = pIterNext;
  }
H
Hongze Cheng 已提交
453

H
Hongze Cheng 已提交
454 455
  // TODO
  ASSERT(0);
H
Hongze Cheng 已提交
456

H
Hongze Cheng 已提交
457 458 459
_exit:
  tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
H
Hongze Cheng 已提交
460

H
Hongze Cheng 已提交
461 462 463 464 465 466 467
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
  int32_t code = 0;
  int32_t lino = 0;

  // Check if can do compact (TODO)

  // Do compact
H
Hongze Cheng 已提交
468
  STsdbCompactor *pCompactor = &(STsdbCompactor){0};
H
Hongze Cheng 已提交
469

H
Hongze Cheng 已提交
470
  code = tsdbBeginCompact(pTsdb, pCompactor);
H
Hongze Cheng 已提交
471 472 473
  TSDB_CHECK_CODE(code, lino, _exit);

  while (true) {
H
Hongze Cheng 已提交
474
    code = tsdbOpenCompactor(pCompactor);
H
Hongze Cheng 已提交
475 476
    TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
477
    if (pCompactor->pDFileSet == NULL) break;
H
Hongze Cheng 已提交
478

H
Hongze Cheng 已提交
479
    // loop to merge row by row
H
Hongze Cheng 已提交
480 481 482
    SRowInfo *pRowInfo = NULL;
    STSchema *pTSchema = NULL;
    int64_t   nRow = 0;
H
Hongze Cheng 已提交
483
    for (;;) {
H
Hongze Cheng 已提交
484
      code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
H
Hongze Cheng 已提交
485
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
486

H
Hongze Cheng 已提交
487
      if (pRowInfo == NULL) break;
H
Hongze Cheng 已提交
488

H
Hongze Cheng 已提交
489 490
      nRow++;

H
Hongze Cheng 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
      // write block data if schema changed
      if ((pCompactor->bData.suid || pCompactor->bData.uid) &&
          !TABLE_SAME_SCHEMA(pCompactor->bData.suid, pCompactor->bData.uid, pRowInfo->suid, pRowInfo->uid)) {
        // TODO: write block data
        ASSERT(0);

        // set block data not initialized
        tBlockDataReset(&pCompactor->bData);
      }

      // init the block data if not initialized yet
      if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {
        code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
                              NULL, 0);
        TSDB_CHECK_CODE(code, lino, _exit);
      }

      // append row to block data
      code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
H
Hongze Cheng 已提交
510 511
      TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
512 513 514 515 516
      // check if block data is full
      if (pCompactor->bData.nRow >= pCompactor->maxRows) {
        // TODO: write block data
        ASSERT(0);
      }
H
Hongze Cheng 已提交
517

H
Hongze Cheng 已提交
518 519 520
      // iterate to next row
      code = tsdbCompactNextRow(pCompactor);
      TSDB_CHECK_CODE(code, lino, _exit);
H
Hongze Cheng 已提交
521
    }
H
Hongze Cheng 已提交
522

H
Hongze Cheng 已提交
523
    tsdbCloseCompactor(pCompactor);
H
Hongze Cheng 已提交
524 525 526 527 528
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
529
    tsdbAbortCompact(pCompactor);
H
Hongze Cheng 已提交
530
  } else {
H
Hongze Cheng 已提交
531
    tsdbCommitCompact(pCompactor);
H
Hongze Cheng 已提交
532
  }
H
Hongze Cheng 已提交
533 534
  return code;
}