smaOpen.c 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"
#include "tsdb.h"

19 20
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration);
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
C
Cary Xu 已提交
21
static int32_t rsmaRestore(SSma *pSma);
22

23
#define SMA_SET_KEEP_CFG(v, l)                                                                    \
24 25 26 27 28
  do {                                                                                            \
    SRetention *r = &pCfg->retentions[l];                                                         \
    pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
    pKeepCfg->keep0 = pKeepCfg->keep2;                                                            \
    pKeepCfg->keep1 = pKeepCfg->keep2;                                                            \
29
    pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days);            \
30 31
  } while (0)

H
Hongze Cheng 已提交
32 33 34 35 36
#define SMA_OPEN_RSMA_IMPL(v, l)                                                             \
  do {                                                                                       \
    SRetention *r = (SRetention *)VND_RETENTIONS(v) + l;                                     \
    if (!RETENTION_VALID(r)) {                                                               \
      if (l == 0) {                                                                          \
K
kailixu 已提交
37 38
        code = TSDB_CODE_INVALID_PARA;                                                       \
        TSDB_CHECK_CODE(code, lino, _exit);                                                  \
H
Hongze Cheng 已提交
39 40 41
      }                                                                                      \
      break;                                                                                 \
    }                                                                                        \
K
kailixu 已提交
42 43
    code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l);                            \
    TSDB_CHECK_CODE(code, lino, _exit);                                                      \
H
Hongze Cheng 已提交
44
    if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
K
kailixu 已提交
45 46
      code = terrno;                                                                         \
      TSDB_CHECK_CODE(code, lino, _exit);                                                    \
H
Hongze Cheng 已提交
47
    }                                                                                        \
48 49
  } while (0)

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
/**
 * @brief Evaluate days(duration) for rsma level 1/2/3.
 *  1) level 1: duration from "create database"
 *  2) level 2/3: duration * (freq/freqL1)
 * @param pVnode
 * @param r
 * @param level
 * @param precision
 * @param duration
 * @return int32_t
 */
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) {
  int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE);
  int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE);
  int32_t days = duration;  // min
65

66 67 68
  if (days < freqDuration) {
    days = freqDuration;
  }
69

70 71 72 73
  if (days > keepDuration) {
    days = keepDuration;
  }

K
kailixu 已提交
74 75
  if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) {
    goto _exit;
76
  }
77 78 79 80 81 82 83 84 85 86 87

  freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE);
  keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE);

  int32_t nFreqTimes = (r + level)->freq / (r + TSDB_RETENTION_L0)->freq;
  days *= (nFreqTimes > 1 ? nFreqTimes : 1);

  if (days > keepDuration) {
    days = keepDuration;
  }

C
Cary Xu 已提交
88 89
  if (days > TSDB_MAX_DURATION_PER_FILE) {
    days = TSDB_MAX_DURATION_PER_FILE;
90 91 92 93 94
  }

  if (days < freqDuration) {
    days = freqDuration;
  }
K
kailixu 已提交
95
_exit:
96
  smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration);
97
  return days;
98 99
}

100
int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
K
kailixu 已提交
101
  terrno = 0;
102 103 104
  pKeepCfg->precision = pCfg->precision;
  switch (type) {
    case TSDB_TYPE_TSMA:
K
kailixu 已提交
105
      ASSERTS(0, "undefined smaType:%d", (int32_t)type);
K
kailixu 已提交
106
      terrno = TSDB_CODE_APP_ERROR;
107 108
      break;
    case TSDB_TYPE_RSMA_L0:
109
      SMA_SET_KEEP_CFG(pVnode, 0);
110 111
      break;
    case TSDB_TYPE_RSMA_L1:
112
      SMA_SET_KEEP_CFG(pVnode, 1);
113 114
      break;
    case TSDB_TYPE_RSMA_L2:
115
      SMA_SET_KEEP_CFG(pVnode, 2);
116 117
      break;
    default:
K
kailixu 已提交
118
      ASSERTS(0, "unknown smaType:%d", (int32_t)type);
K
kailixu 已提交
119
      terrno = TSDB_CODE_APP_ERROR;
120 121
      break;
  }
K
kailixu 已提交
122
  return terrno;
123 124
}

H
Hongze Cheng 已提交
125
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
K
kailixu 已提交
126 127
  int32_t   code = 0;
  int32_t   lino = 0;
128 129 130 131
  STsdbCfg *pCfg = &pVnode->config.tsdbCfg;

  SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
  if (!pSma) {
K
kailixu 已提交
132 133
    code = TSDB_CODE_OUT_OF_MEMORY;
    TSDB_CHECK_CODE(code, lino, _exit);
134
  }
C
Cary Xu 已提交
135 136 137

  pVnode->pSma = pSma;

138 139 140 141
  pSma->pVnode = pVnode;
  taosThreadMutexInit(&pSma->mutex, NULL);
  pSma->locked = false;

C
Cary Xu 已提交
142
  if (VND_IS_RSMA(pVnode)) {
143
    STsdbKeepCfg keepCfg = {0};
K
kailixu 已提交
144
    for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
145 146 147 148 149 150 151 152 153
      if (i == TSDB_RETENTION_L0) {
        SMA_OPEN_RSMA_IMPL(pVnode, 0);
      } else if (i == TSDB_RETENTION_L1) {
        SMA_OPEN_RSMA_IMPL(pVnode, 1);
      } else if (i == TSDB_RETENTION_L2) {
        SMA_OPEN_RSMA_IMPL(pVnode, 2);
      }
    }

C
Cary Xu 已提交
154
    // restore the rsma
K
kailixu 已提交
155
    if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) {
K
kailixu 已提交
156 157
      code = terrno;
      TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
158
    }
C
Cary Xu 已提交
159 160
  }

K
kailixu 已提交
161 162 163 164 165
_exit:
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  }
  return code;
166 167
}

C
Cary Xu 已提交
168
int32_t smaClose(SSma *pSma) {
C
Cary Xu 已提交
169
  if (pSma) {
K
kailixu 已提交
170
    smaPreClose(pSma);
C
Cary Xu 已提交
171
    taosThreadMutexDestroy(&pSma->mutex);
C
Cary Xu 已提交
172 173
    SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
    SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
174 175 176
    if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
    if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
    if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
C
Cary Xu 已提交
177
    taosMemoryFreeClear(pSma);
178 179
  }
  return 0;
C
Cary Xu 已提交
180 181 182 183
}

/**
 * @brief rsma env restore
184 185 186 187 188
 *
 * @param pSma
 * @param type
 * @param committedVer
 * @return int32_t
C
Cary Xu 已提交
189
 */
K
kailixu 已提交
190
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) {
K
kailixu 已提交
191 192 193 194
  if (!VND_IS_RSMA(pSma->pVnode)) {
    terrno = TSDB_CODE_RSMA_INVALID_ENV;
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
195

K
kailixu 已提交
196
  return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback);
197
}