smaEnv.c 11.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

C
Cary Xu 已提交
20
#define SMA_MGMT_REF_NUM 10240
21 22

extern SSmaMgmt smaMgmt;
23 24 25

// declaration of static functions

C
Cary Xu 已提交
26
static int32_t  tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
C
Cary Xu 已提交
27 28
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t  tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
C
Cary Xu 已提交
29
static void    *tdFreeTSmaStat(STSmaStat *pStat);
30
static void     tdDestroyRSmaStat(void *pRSmaStat);
31

C
Cary Xu 已提交
32 33 34 35 36
/**
 * @brief rsma init
 *
 * @return int32_t
 */
37
// implementation
C
Cary Xu 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
51
    // init tref rset
C
Cary Xu 已提交
52 53 54
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
55
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
56
      smaError("failed to init sma rset since %s", terrstr());
57 58 59 60 61 62 63
      return TSDB_CODE_FAILED;
    }

    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
C
Cary Xu 已提交
64
      atomic_store_8(&smaMgmt.inited, 0);
65
      smaError("failed to init sma tmr hanle since %s", terrstr());
C
Cary Xu 已提交
66 67 68 69
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
70
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
94 95
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
96 97 98
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
99

C
Cary Xu 已提交
100
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
101 102 103 104 105 106 107 108 109 110
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMA_ENV_TYPE(pEnv) = smaType;

C
Cary Xu 已提交
111
  taosInitRWLatch(&(pEnv->lock));
112

C
Cary Xu 已提交
113
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
114 115 116 117 118 119 120
    tdFreeSmaEnv(pEnv);
    return NULL;
  }

  return pEnv;
}

C
Cary Xu 已提交
121
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
122 123 124 125 126 127
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (!(*pEnv)) {
C
Cary Xu 已提交
128
    if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
144
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
145 146 147 148
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
149 150 151 152
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
153 154 155 156 157 158 159
  return NULL;
}

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_INC(pStat);
S
Shengliang Guan 已提交
160
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
161 162 163 164 165 166 167
  return 0;
}

int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_DEC(pStat);
S
Shengliang Guan 已提交
168
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
169 170 171
  return 0;
}

C
Cary Xu 已提交
172 173
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;
C
Cary Xu 已提交
174

C
Cary Xu 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188
  int ref = T_REF_INC(pRSmaInfo);
  smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
  return 0;
}

int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;

  int ref = T_REF_DEC(pRSmaInfo);
  smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);

  return 0;
}

C
Cary Xu 已提交
189
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
190 191 192 193 194 195 196
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
197
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
198 199 200 201 202 203 204 205 206 207 208
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
209 210
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
211
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
212 213

      // init smaMgmt
C
Cary Xu 已提交
214
      smaInit();
215

C
Cary Xu 已提交
216
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
217
      if (refId < 0) {
C
Cary Xu 已提交
218 219
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
220
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
221 222 223
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
224 225 226
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
227
      // init hash
C
Cary Xu 已提交
228
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
229
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
230
      if (!RSMA_INFO_HASH(pRSmaStat)) {
C
Cary Xu 已提交
231 232 233 234 235 236
        return TSDB_CODE_FAILED;
      }

      RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
      if (!RSMA_FETCH_HASH(pRSmaStat)) {
237 238 239
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
240
      // TODO
241 242 243 244 245 246 247
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
248
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
249
  if (pStat) {
C
Cary Xu 已提交
250
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
251 252
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
253 254 255 256 257 258 259 260 261 262
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

263 264 265
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
266 267 268
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
269 270
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);

271
    // step 2: destroy the rsma info and associated fetch tasks
272 273 274 275
    if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
      void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
      while (infoHash) {
        SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
C
Cary Xu 已提交
276
        tdFreeRSmaInfo(pSma, pSmaInfo, true);
277 278
        infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
      }
C
Cary Xu 已提交
279 280 281
    }
    taosHashCleanup(RSMA_INFO_HASH(pStat));

C
Cary Xu 已提交
282 283 284 285
    // step 3: destroy the rsma fetch hash
    taosHashCleanup(RSMA_FETCH_HASH(pStat));

    // step 4: wait all triggered fetch tasks finished
286
    int32_t nLoops = 0;
C
Cary Xu 已提交
287 288
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
289
        smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
C
Cary Xu 已提交
290
        break;
291
      } else {
C
Cary Xu 已提交
292
        smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
293 294 295 296 297 298
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
299
    }
C
Cary Xu 已提交
300 301

    // step 5: free pStat
C
Cary Xu 已提交
302
    taosMemoryFreeClear(pStat);
303
  }
304 305
}

C
Cary Xu 已提交
306
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
307
  tdDestroySmaState(pSmaStat, smaType);
308 309 310 311 312
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
313 314 315
  return NULL;
}

316 317 318 319 320 321
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
322

323 324 325
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
326
      tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
327
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
328
      SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
C
Cary Xu 已提交
329 330
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
C
Cary Xu 已提交
331
      if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
C
Cary Xu 已提交
332 333
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
                 smaMgmt.rsetId, terrstr());
C
Cary Xu 已提交
334
      } else {
C
Cary Xu 已提交
335
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
336
      }
337 338 339 340
    } else {
      ASSERT(0);
    }
  }
341
  return 0;
342 343 344 345 346
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
347
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
348 349 350 351 352 353 354 355 356 357 358 359
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
360
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
361 362 363 364 365 366
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
367
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
368 369 370 371 372 373 374 375 376 377 378 379 380 381
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
S
Shengliang Guan 已提交
382
      smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
383 384 385 386 387 388 389 390 391 392
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
    char rname[TSDB_FILENAME_LEN] = {0};

C
Cary Xu 已提交
393
    if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
394 395 396 397 398 399 400 401 402 403
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }

    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
404
};