tsdbCompact.c 5.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

H
Hongze Cheng 已提交
18 19
typedef struct {
} SMemDIter;
H
Hongze Cheng 已提交
20 21

typedef struct {
H
Hongze Cheng 已提交
22 23 24 25 26 27 28
  SDataFReader *pReader;
  SArray       *aBlockIdx;  // SArray<SBlockIdx>
  SMapData      mDataBlk;   // SMapData<SDataBlk>
  SBlockData    bData;
  int32_t       iBlockIdx;
  int32_t       iDataBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
29 30 31
} SDataDIter;

typedef struct {
H
Hongze Cheng 已提交
32 33 34 35 36 37
  SDataFReader *pReader;
  int32_t       iStt;
  SArray       *aSttBlk;  // SArray<SSttBlk>
  SBlockData    bData;
  int32_t       iSttBlk;
  int32_t       iRow;
H
Hongze Cheng 已提交
38 39
} SSttDIter;

H
Hongze Cheng 已提交
40 41 42 43 44 45
typedef struct {
  int32_t  flag;
  SRowInfo rowInfo;
  char     handle[];
} STsdbDataIter;

H
Hongze Cheng 已提交
46
typedef struct {
H
Hongze Cheng 已提交
47 48 49 50 51 52 53
  STsdb        *pTsdb;
  STsdbFS       fs;
  int64_t       cid;
  int32_t       fid;
  SDataFReader *pReader;
  SDFileSet    *pDFileSet;
  SRBTree       rtree;
H
Hongze Cheng 已提交
54 55
} STsdbCompactor;

H
Hongze Cheng 已提交
56 57
#define TSDB_FLG_DEEP_COMPACT 0x1

H
Hongze Cheng 已提交
58
// ITER =========================
H
Hongze Cheng 已提交
59
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
H
Hongze Cheng 已提交
60 61
  int32_t code = 0;
  int32_t lino = 0;
H
Hongze Cheng 已提交
62 63 64 65 66 67 68

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

H
Hongze Cheng 已提交
69
  // TODO
H
Hongze Cheng 已提交
70

H
Hongze Cheng 已提交
71
_exit:
H
Hongze Cheng 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter));
  if (NULL == pIter) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // TODO

_exit:
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
  return code;
}

static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
  int32_t code = 0;
  int32_t lino = 0;

  STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter));
  if (pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

  // TODO

_exit:
  if (code) {
    *ppIter = NULL;
  } else {
    *ppIter = pIter;
  }
H
Hongze Cheng 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132
  return code;
}

static void tsdbDataIterClose(STsdbDataIter *pIter) {
  // TODO
}

static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
  int32_t code = 0;
  int32_t lino = 0;
  // TODO
_exit:
  return code;
}
H
Hongze Cheng 已提交
133 134

// COMPACT =========================
H
Hongze Cheng 已提交
135
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
136 137 138
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
139 140 141 142 143 144 145
  pCompactor->pTsdb = pTsdb;

  code = tsdbFSCopy(pTsdb, &pCompactor->fs);
  TSDB_CHECK_CODE(code, lino, _exit);

  pCompactor->fid = INT32_MIN;

H
Hongze Cheng 已提交
146 147 148 149 150 151 152
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
153
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
154 155 156
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
157 158
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
159 160 161 162 163 164 165 166 167
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
168
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
169 170 171
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
172 173
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
174 175 176 177 178 179 180 181 182
  // TODO

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
183
static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
184 185 186
  int32_t code = 0;
  int32_t lino = 0;

H
Hongze Cheng 已提交
187
  STsdb *pTsdb = pCompactor->pTsdb;
H
Hongze Cheng 已提交
188

H
Hongze Cheng 已提交
189 190 191
  code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
192 193 194 195 196 197 198
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

H
Hongze Cheng 已提交
199
static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) {
H
Hongze Cheng 已提交
200
  int32_t code = 0;
H
Hongze Cheng 已提交
201 202
  int32_t lino = 0;

H
Hongze Cheng 已提交
203 204
  STsdb *pTsdb = pCompactor->pTsdb;

H
Hongze Cheng 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218
_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
  }
  return code;
}

int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
  int32_t code = 0;
  int32_t lino = 0;

  // Check if can do compact (TODO)

  // Do compact
H
Hongze Cheng 已提交
219 220 221
  STsdbCompactor compactor = {0};

  code = tsdbBeginCompact(pTsdb, &compactor);
H
Hongze Cheng 已提交
222 223
  TSDB_CHECK_CODE(code, lino, _exit);

H
Hongze Cheng 已提交
224 225 226 227
  while (true) {
    compactor.pDFileSet = (SDFileSet *)taosArraySearch(compactor.fs.aDFileSet, &compactor.fid, tDFileSetCmprFn, TD_GT);
    if (compactor.pDFileSet == NULL) break;

H
Hongze Cheng 已提交
228 229
    compactor.fid = compactor.pDFileSet->fid;

H
Hongze Cheng 已提交
230 231 232 233 234 235 236
    if (flag & TSDB_FLG_DEEP_COMPACT) {
      code = tsdbDeepCompact(&compactor);
      TSDB_CHECK_CODE(code, lino, _exit);
    } else {
      code = tsdbShallowCompact(&compactor);
      TSDB_CHECK_CODE(code, lino, _exit);
    }
H
Hongze Cheng 已提交
237 238 239 240 241
  }

_exit:
  if (code) {
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
H
Hongze Cheng 已提交
242
    tsdbAbortCompact(&compactor);
H
Hongze Cheng 已提交
243
  } else {
H
Hongze Cheng 已提交
244
    tsdbCommitCompact(&compactor);
H
Hongze Cheng 已提交
245
  }
H
Hongze Cheng 已提交
246 247
  return code;
}