smaOpen.c 5.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"
#include "tsdb.h"

static int32_t smaEvalDays(SRetention *r, int8_t precision);
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);

#define SMA_SET_KEEP_CFG(l)                                                                       \
  do {                                                                                            \
    SRetention *r = &pCfg->retentions[l];                                                         \
    pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
    pKeepCfg->keep0 = pKeepCfg->keep2;                                                            \
    pKeepCfg->keep1 = pKeepCfg->keep2;                                                            \
    pKeepCfg->days = smaEvalDays(r, pCfg->precision);                                             \
  } while (0)

#define SMA_OPEN_RSMA_IMPL(v, l)                                                   \
  do {                                                                             \
    SRetention *r = (SRetention *)VND_RETENTIONS(v) + l;                           \
    if (!RETENTION_VALID(r)) {                                                     \
      if (l == 0) {                                                                \
        goto _err;                                                                 \
      }                                                                            \
      break;                                                                       \
    }                                                                              \
    smaSetKeepCfg(&keepCfg, pCfg, TSDB_TYPE_RSMA_L##l);                            \
    if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \
      goto _err;                                                                   \
    }                                                                              \
  } while (0)

#define RETENTION_DAYS_SPLIT_RATIO 10
#define RETENTION_DAYS_SPLIT_MIN   1
#define RETENTION_DAYS_SPLIT_MAX   30

static int32_t smaEvalDays(SRetention *r, int8_t precision) {
  int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
  int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);

  int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
  if (days <= RETENTION_DAYS_SPLIT_MIN) {
    days = RETENTION_DAYS_SPLIT_MIN;
    if (days < freqDays) {
      days = freqDays + 1;
    }
  } else {
    if (days > RETENTION_DAYS_SPLIT_MAX) {
      days = RETENTION_DAYS_SPLIT_MAX;
    }
    if (days < freqDays) {
      days = freqDays + 1;
    }
  }
  return days * 1440;
}

int smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
  pKeepCfg->precision = pCfg->precision;
  switch (type) {
    case TSDB_TYPE_TSMA:
      ASSERT(0);
      break;
    case TSDB_TYPE_RSMA_L0:
      SMA_SET_KEEP_CFG(0);
      break;
    case TSDB_TYPE_RSMA_L1:
      SMA_SET_KEEP_CFG(1);
      break;
    case TSDB_TYPE_RSMA_L2:
      SMA_SET_KEEP_CFG(2);
      break;
    default:
      ASSERT(0);
      break;
  }
  return 0;
}

int32_t smaOpen(SVnode *pVnode) {
  STsdbCfg *pCfg = &pVnode->config.tsdbCfg;

  ASSERT(!pVnode->pSma);

  SSma *pSma = taosMemoryCalloc(1, sizeof(SSma));
  if (!pSma) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pSma->pVnode = pVnode;
  taosThreadMutexInit(&pSma->mutex, NULL);
  pSma->locked = false;

C
Cary Xu 已提交
107
  if (VND_IS_RSMA(pVnode)) {
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
    STsdbKeepCfg keepCfg = {0};
    for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
      if (i == TSDB_RETENTION_L0) {
        SMA_OPEN_RSMA_IMPL(pVnode, 0);
      } else if (i == TSDB_RETENTION_L1) {
        SMA_OPEN_RSMA_IMPL(pVnode, 1);
      } else if (i == TSDB_RETENTION_L2) {
        SMA_OPEN_RSMA_IMPL(pVnode, 2);
      } else {
        ASSERT(0);
      }
    }
  }

  pVnode->pSma = pSma;
  return 0;
_err:
  taosMemoryFreeClear(pSma);
  return -1;
}

int32_t smaClose(SSma *pSma) {
  if (pSma) {
    taosThreadMutexDestroy(&pSma->mutex);
    if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma));
    if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma));
    if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma));
C
Cary Xu 已提交
135 136 137
    // SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
    // SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
    taosMemoryFreeClear(pSma);
138 139
  }
  return 0;
C
Cary Xu 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152
}

/**
 * @brief rsma env restore
 * 
 * @param pSma 
 * @return int32_t 
 */
int32_t smaRestore(SSma *pSma) {
  if (!pSma) return 0;
  // iterate all stables to restore the rsma env
  
  return TSDB_CODE_SUCCESS;
153
}