smaEnv.c 10.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

C
Cary Xu 已提交
20
#define SMA_MGMT_REF_NUM         10240
21 22

extern SSmaMgmt smaMgmt;
23 24 25

// declaration of static functions

C
Cary Xu 已提交
26
static int32_t  tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
C
Cary Xu 已提交
27 28
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t  tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
C
Cary Xu 已提交
29
static void    *tdFreeTSmaStat(STSmaStat *pStat);
30
static void     tdDestroyRSmaStat(void *pRSmaStat);
31

C
Cary Xu 已提交
32 33 34 35 36
/**
 * @brief rsma init
 *
 * @return int32_t
 */
37
// implementation
C
Cary Xu 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
51
    // init tref rset
C
Cary Xu 已提交
52 53 54
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
55
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
56
      smaError("failed to init sma rset since %s", terrstr());
57 58 59 60 61 62 63
      return TSDB_CODE_FAILED;
    }

    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
C
Cary Xu 已提交
64
      atomic_store_8(&smaMgmt.inited, 0);
65
      smaError("failed to init sma tmr hanle since %s", terrstr());
C
Cary Xu 已提交
66 67 68 69
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
70
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
94 95
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
96 97 98
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
99

C
Cary Xu 已提交
100
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMA_ENV_TYPE(pEnv) = smaType;

  int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
118
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
119 120 121 122 123 124 125
    tdFreeSmaEnv(pEnv);
    return NULL;
  }

  return pEnv;
}

C
Cary Xu 已提交
126
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
127 128 129 130 131 132
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (!(*pEnv)) {
C
Cary Xu 已提交
133
    if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
149
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
150 151 152 153 154
    taosThreadRwlockDestroy(&(pSmaEnv->lock));
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
155 156 157 158
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
159 160 161 162 163 164 165
  return NULL;
}

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_INC(pStat);
S
Shengliang Guan 已提交
166
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
167 168 169 170 171 172 173
  return 0;
}

int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_DEC(pStat);
S
Shengliang Guan 已提交
174
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
175 176 177
  return 0;
}

C
Cary Xu 已提交
178
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
179 180 181 182 183 184 185
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
186
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
187 188 189 190 191 192 193 194 195 196 197
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
198 199
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
200
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
201 202

      // init smaMgmt
C
Cary Xu 已提交
203
      smaInit();
204

C
Cary Xu 已提交
205
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
206
      if (refId < 0) {
C
Cary Xu 已提交
207 208
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
209
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
210 211 212
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
213 214 215
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
216
      // init hash
C
Cary Xu 已提交
217
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
218
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
219
      if (!RSMA_INFO_HASH(pRSmaStat)) {
220 221 222 223
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
224
      // TODO
225 226 227 228 229 230 231
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
232
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
233
  if (pStat) {
C
Cary Xu 已提交
234
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
235 236
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
237 238 239 240 241 242 243 244 245 246
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

247 248 249
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
250 251 252
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
253 254
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);

255
    // step 2: destroy the rsma info and associated fetch tasks
C
Cary Xu 已提交
256
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
L
Liu Jicong 已提交
257
#if 1
258 259 260 261
    if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
      void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
      while (infoHash) {
        SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
262
        tdFreeRSmaInfo(pSma, pSmaInfo);
263 264
        infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
      }
C
Cary Xu 已提交
265
    }
L
Liu Jicong 已提交
266
#endif
C
Cary Xu 已提交
267 268
    taosHashCleanup(RSMA_INFO_HASH(pStat));

269 270
    // step 3: wait all triggered fetch tasks finished
    int32_t nLoops = 0;
C
Cary Xu 已提交
271 272
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
273
        smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
C
Cary Xu 已提交
274
        break;
275
      } else {
C
Cary Xu 已提交
276
        smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
277 278 279 280 281 282
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
283
    }
284
  }
285 286
}

C
Cary Xu 已提交
287
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
288
  tdDestroySmaState(pSmaStat, smaType);
289 290 291 292 293
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
294 295 296
  return NULL;
}

297 298 299 300 301 302
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
303

304 305 306
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
307
      tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
308
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
309
      SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
C
Cary Xu 已提交
310 311 312 313 314 315
      if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma),
                 RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr());
      } else {
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma),
                 RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId);
316
      }
317 318 319 320
    } else {
      ASSERT(0);
    }
  }
321
  return 0;
322 323 324 325 326
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
327
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
328 329 330 331 332 333 334 335 336 337 338 339
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
340
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
341 342 343 344 345 346
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
347
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
348 349 350 351 352 353 354 355 356 357 358 359 360 361
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
C
Cary Xu 已提交
362
      smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
363 364 365 366 367 368 369 370 371 372
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
    char rname[TSDB_FILENAME_LEN] = {0};

C
Cary Xu 已提交
373
    if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
374 375 376 377 378 379 380 381 382 383
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }

    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
384
};