smaEnv.c 11.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

C
Cary Xu 已提交
20
#define SMA_MGMT_REF_NUM 10240
21 22

extern SSmaMgmt smaMgmt;
23 24 25

// declaration of static functions

C
Cary Xu 已提交
26
static int32_t  tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
C
Cary Xu 已提交
27 28
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t  tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
C
Cary Xu 已提交
29
static void    *tdFreeTSmaStat(STSmaStat *pStat);
30
static void     tdDestroyRSmaStat(void *pRSmaStat);
31

C
Cary Xu 已提交
32 33 34 35 36
/**
 * @brief rsma init
 *
 * @return int32_t
 */
37
// implementation
C
Cary Xu 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
51
    // init tref rset
C
Cary Xu 已提交
52 53 54
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
55
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
56
      smaError("failed to init sma rset since %s", terrstr());
57 58 59 60 61 62 63
      return TSDB_CODE_FAILED;
    }

    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
C
Cary Xu 已提交
64
      atomic_store_8(&smaMgmt.inited, 0);
65
      smaError("failed to init sma tmr hanle since %s", terrstr());
C
Cary Xu 已提交
66 67 68 69
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
70
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
94 95
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
96 97 98
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
99

C
Cary Xu 已提交
100
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
101 102 103 104 105 106 107 108 109 110
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMA_ENV_TYPE(pEnv) = smaType;

C
Cary Xu 已提交
111
  taosInitRWLatch(&(pEnv->lock));
112

C
Cary Xu 已提交
113
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
114 115 116 117 118 119 120
    tdFreeSmaEnv(pEnv);
    return NULL;
  }

  return pEnv;
}

C
Cary Xu 已提交
121
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
122 123 124 125 126 127
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (!(*pEnv)) {
C
Cary Xu 已提交
128
    if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
144
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
145 146 147 148
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
149 150 151 152
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
153 154 155 156 157 158 159
  return NULL;
}

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_INC(pStat);
S
Shengliang Guan 已提交
160
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
161 162 163 164 165 166 167
  return 0;
}

int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_DEC(pStat);
S
Shengliang Guan 已提交
168
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
169 170 171
  return 0;
}

C
Cary Xu 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;

  int ref = T_REF_INC(pRSmaInfo);
  smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
  return 0;
}

int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;

  int ref = T_REF_DEC(pRSmaInfo);
  smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);

  if (ref == 0) {
    tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid);
  }
  return 0;
}

C
Cary Xu 已提交
192
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
193 194 195 196 197 198 199
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
200
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
201 202 203 204 205 206 207 208 209 210 211
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
212 213
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
214
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
215 216

      // init smaMgmt
C
Cary Xu 已提交
217
      smaInit();
218

C
Cary Xu 已提交
219
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
220
      if (refId < 0) {
C
Cary Xu 已提交
221 222
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
223
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
224 225 226
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
227 228 229
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
230
      // init hash
C
Cary Xu 已提交
231
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
232
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
233
      if (!RSMA_INFO_HASH(pRSmaStat)) {
234 235 236 237
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
238
      // TODO
239 240 241 242 243 244 245
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
246
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
247
  if (pStat) {
C
Cary Xu 已提交
248
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
249 250
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
251 252 253 254 255 256 257 258 259 260
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

261 262 263
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
264 265 266
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
267 268
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);

269
    // step 2: destroy the rsma info and associated fetch tasks
C
Cary Xu 已提交
270
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
L
Liu Jicong 已提交
271
#if 1
272 273 274 275
    if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
      void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
      while (infoHash) {
        SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
C
Cary Xu 已提交
276
        tdFreeRSmaInfo(pSma, pSmaInfo, true);
277 278
        infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
      }
C
Cary Xu 已提交
279
    }
L
Liu Jicong 已提交
280
#endif
C
Cary Xu 已提交
281 282
    taosHashCleanup(RSMA_INFO_HASH(pStat));

283 284
    // step 3: wait all triggered fetch tasks finished
    int32_t nLoops = 0;
C
Cary Xu 已提交
285 286
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
287
        smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
C
Cary Xu 已提交
288
        break;
289
      } else {
C
Cary Xu 已提交
290
        smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
291 292 293 294 295 296
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
297
    }
C
Cary Xu 已提交
298 299 300

    // step 4: free pStat
    taosMemoryFreeClear(pStat);
301
  }
302 303
}

C
Cary Xu 已提交
304
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
305
  tdDestroySmaState(pSmaStat, smaType);
306 307 308 309 310
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
311 312 313
  return NULL;
}

314 315 316 317 318 319
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
320

321 322 323
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
324
      tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
325
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
326
      SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
C
Cary Xu 已提交
327 328
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
C
Cary Xu 已提交
329
      if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
C
Cary Xu 已提交
330 331
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
                 smaMgmt.rsetId, terrstr());
C
Cary Xu 已提交
332
      } else {
C
Cary Xu 已提交
333
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
334
      }
335 336 337 338
    } else {
      ASSERT(0);
    }
  }
339
  return 0;
340 341 342 343 344
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
345
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
346 347 348 349 350 351 352 353 354 355 356 357
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
358
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
359 360 361 362 363 364
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
365
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
366 367 368 369 370 371 372 373 374 375 376 377 378 379
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
S
Shengliang Guan 已提交
380
      smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
381 382 383 384 385 386 387 388 389 390
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
    char rname[TSDB_FILENAME_LEN] = {0};

C
Cary Xu 已提交
391
    if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
392 393 394 395 396 397 398 399 400 401
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }

    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
402
};