/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "sma.h" typedef struct SSmaStat SSmaStat; #define RSMA_TASK_INFO_HASH_SLOT 8 // declaration of static functions static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma); static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); static void *tdFreeTSmaStat(STSmaStat *pStat); // implementation static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv)); if (!pEnv) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } SMA_ENV_TYPE(pEnv) = smaType; int code = taosThreadRwlockInit(&(pEnv->lock), NULL); if (code) { terrno = TAOS_SYSTEM_ERROR(code); taosMemoryFree(pEnv); return NULL; } if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); return NULL; } return pEnv; } static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) { if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } if (!(*pEnv)) { if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) { return TSDB_CODE_FAILED; } } return TSDB_CODE_SUCCESS; } /** * @brief Release resources allocated for its member fields, not including itself. * * @param pSmaEnv * @return int32_t */ void tdDestroySmaEnv(SSmaEnv *pSmaEnv) { if (pSmaEnv) { pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv)); taosThreadRwlockDestroy(&(pSmaEnv->lock)); } } void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) { if (pSmaEnv) { tdDestroySmaEnv(pSmaEnv); taosMemoryFreeClear(pSmaEnv); } return NULL; } int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) { if (!pStat) return 0; int ref = T_REF_INC(pStat); smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); return 0; } int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { if (!pStat) return 0; int ref = T_REF_DEC(pStat); smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); return 0; } static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) { ASSERT(pSmaStat != NULL); if (*pSmaStat) { // no lock return TSDB_CODE_SUCCESS; } /** * 1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew). * 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if * tdInitSmaStat invoked in other multithread environment later. */ if (!(*pSmaStat)) { *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat)); if (!(*pSmaStat)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } if (smaType == TSDB_SMA_TYPE_ROLLUP) { SMA_RSMA_STAT(*pSmaStat)->pSma = (SSma*)pSma; // init timer SMA_RSMA_TMR_HANDLE(*pSmaStat) = taosTmrInit(10000, 100, 10000, "RSMA_G"); if (!SMA_RSMA_TMR_HANDLE(*pSmaStat)) { taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } atomic_store_8(&SMA_RSMA_TMR_STAT(*pSmaStat), TASK_TRIGGER_STATUS__ACTIVE); // init hash SMA_RSMA_INFO_HASH(*pSmaStat) = taosHashInit( RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!SMA_RSMA_INFO_HASH(*pSmaStat)) { if (SMA_RSMA_TMR_HANDLE(*pSmaStat)) { taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(*pSmaStat)); } taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED; } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { ASSERT(0); } } return TSDB_CODE_SUCCESS; } static void *tdFreeTSmaStat(STSmaStat *pStat) { if (pStat) { tDestroyTSma(pStat->pTSma); taosMemoryFreeClear(pStat->pTSma); taosMemoryFreeClear(pStat); } return NULL; } void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroySmaState(pSmaStat, smaType); taosMemoryFreeClear(pSmaStat); return NULL; } /** * @brief Release resources allocated for its member fields, not including itself. * * @param pSmaStat * @return int32_t */ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { tdFreeTSmaStat(&pSmaStat->tsmaStat); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { if (SMA_RSMA_TMR_HANDLE(pSmaStat)) { taosTmrCleanUp(SMA_RSMA_TMR_HANDLE(pSmaStat)); } // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. void *infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), NULL); while (infoHash) { SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; tdFreeRSmaInfo(pInfoHash); infoHash = taosHashIterate(SMA_RSMA_INFO_HASH(pSmaStat), infoHash); } taosHashCleanup(SMA_RSMA_INFO_HASH(pSmaStat)); } else { ASSERT(0); } } return TSDB_CODE_SUCCESS; } int32_t tdLockSma(SSma *pSma) { int code = taosThreadMutexLock(&pSma->mutex); if (code != 0) { smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); return -1; } pSma->locked = true; return 0; } int32_t tdUnLockSma(SSma *pSma) { ASSERT(SMA_LOCKED(pSma)); pSma->locked = false; int code = taosThreadMutexUnlock(&pSma->mutex); if (code != 0) { smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); return -1; } return 0; } int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { SSmaEnv *pEnv = NULL; switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) { return TSDB_CODE_SUCCESS; } break; case TSDB_SMA_TYPE_ROLLUP: if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) { return TSDB_CODE_SUCCESS; } break; default: TASSERT(0); return TSDB_CODE_FAILED; } // init sma env tdLockSma(pSma); pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma)) : atomic_load_ptr(&SMA_RSMA_ENV(pSma)); if (!pEnv) { char rname[TSDB_FILENAME_LEN] = {0}; if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) { tdUnLockSma(pSma); return TSDB_CODE_FAILED; } (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv) : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv); } tdUnLockSma(pSma); return TSDB_CODE_SUCCESS; }; int32_t smaTimerInit(void **timer, int8_t *initFlag, const char *label) { int8_t old; while (1) { old = atomic_val_compare_exchange_8(initFlag, 0, 2); if (old != 2) break; } if (old == 0) { *timer = taosTmrInit(10000, 100, 10000, label); if (!(*timer)) { atomic_store_8(initFlag, 0); return -1; } atomic_store_8(initFlag, 1); } return 0; } void smaTimerCleanUp(void *timer, int8_t *initFlag) { int8_t old; while (1) { old = atomic_val_compare_exchange_8(initFlag, 1, 2); if (old != 2) break; } if (old == 1) { taosTmrCleanUp(timer); atomic_store_8(initFlag, 0); } }