smaEnv.c 11.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

C
Cary Xu 已提交
20
#define SMA_MGMT_REF_NUM 10240
21 22

extern SSmaMgmt smaMgmt;
23 24 25

// declaration of static functions

C
Cary Xu 已提交
26
static int32_t  tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
C
Cary Xu 已提交
27 28
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t  tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
C
Cary Xu 已提交
29
static void    *tdFreeTSmaStat(STSmaStat *pStat);
30
static void     tdDestroyRSmaStat(void *pRSmaStat);
31

C
Cary Xu 已提交
32 33 34 35 36
/**
 * @brief rsma init
 *
 * @return int32_t
 */
37
// implementation
C
Cary Xu 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
51
    // init tref rset
C
Cary Xu 已提交
52 53 54
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
55
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
56
      smaError("failed to init sma rset since %s", terrstr());
57 58 59 60 61 62 63
      return TSDB_CODE_FAILED;
    }

    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
C
Cary Xu 已提交
64
      atomic_store_8(&smaMgmt.inited, 0);
65
      smaError("failed to init sma tmr hanle since %s", terrstr());
C
Cary Xu 已提交
66 67 68 69
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
70
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
94 95
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
96 97 98
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
99

C
Cary Xu 已提交
100
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
101 102 103 104 105 106 107 108 109 110
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMA_ENV_TYPE(pEnv) = smaType;

C
Cary Xu 已提交
111
  taosInitRWLatch(&(pEnv->lock));
112

C
Cary Xu 已提交
113
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
114 115 116 117 118 119 120
    tdFreeSmaEnv(pEnv);
    return NULL;
  }

  return pEnv;
}

C
Cary Xu 已提交
121
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
122 123 124 125 126 127
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (!(*pEnv)) {
C
Cary Xu 已提交
128
    if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
144
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
145 146 147 148
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
149 150 151 152
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
153 154 155 156 157 158 159
  return NULL;
}

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_INC(pStat);
S
Shengliang Guan 已提交
160
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
161 162 163 164 165 166 167
  return 0;
}

int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_DEC(pStat);
S
Shengliang Guan 已提交
168
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
169 170 171
  return 0;
}

C
Cary Xu 已提交
172 173
int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;
C
Cary Xu 已提交
174

C
Cary Xu 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188
  int ref = T_REF_INC(pRSmaInfo);
  smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
  return 0;
}

int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
  if (!pRSmaInfo) return 0;

  int ref = T_REF_DEC(pRSmaInfo);
  smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);

  return 0;
}

C
Cary Xu 已提交
189
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
190 191 192 193 194 195 196
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
197
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
198 199 200 201 202 203 204 205 206 207 208
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
209 210
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
211
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
C
Cary Xu 已提交
212
      tsem_init(&pRSmaStat->notEmpty, 0, 0);
213 214

      // init smaMgmt
C
Cary Xu 已提交
215
      smaInit();
216

C
Cary Xu 已提交
217
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
218
      if (refId < 0) {
C
Cary Xu 已提交
219 220
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
221
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
222 223 224
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
225 226 227
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
228
      // init hash
C
Cary Xu 已提交
229
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
230
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
231
      if (!RSMA_INFO_HASH(pRSmaStat)) {
C
Cary Xu 已提交
232 233
        return TSDB_CODE_FAILED;
      }
234
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
235
      // TODO
236 237 238 239 240 241 242
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
243
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
244
  if (pStat) {
C
Cary Xu 已提交
245
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
246 247
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
248 249 250 251 252 253 254 255 256 257
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

258 259 260
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
261 262 263
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
264
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
C
Cary Xu 已提交
265
    tsem_destroy(&(pStat->notEmpty));
C
Cary Xu 已提交
266

267
    // step 2: destroy the rsma info and associated fetch tasks
268 269 270 271
    if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
      void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
      while (infoHash) {
        SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
C
Cary Xu 已提交
272
        tdFreeRSmaInfo(pSma, pSmaInfo, true);
273 274
        infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
      }
C
Cary Xu 已提交
275 276 277
    }
    taosHashCleanup(RSMA_INFO_HASH(pStat));

C
Cary Xu 已提交
278
    // step 3: wait all triggered fetch tasks finished
279
    int32_t nLoops = 0;
C
Cary Xu 已提交
280 281
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
282
        smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
C
Cary Xu 已提交
283
        break;
284
      } else {
C
Cary Xu 已提交
285
        smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
286 287 288 289 290 291
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
292
    }
C
Cary Xu 已提交
293 294

    // step 5: free pStat
C
Cary Xu 已提交
295
    taosMemoryFreeClear(pStat);
296
  }
297 298
}

C
Cary Xu 已提交
299
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
300
  tdDestroySmaState(pSmaStat, smaType);
301 302 303 304 305
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
306 307 308
  return NULL;
}

309 310 311 312 313 314
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
315

316 317 318
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
319
      tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
320
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
321
      SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
C
Cary Xu 已提交
322 323
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
C
Cary Xu 已提交
324
      if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
C
Cary Xu 已提交
325 326
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
                 smaMgmt.rsetId, terrstr());
C
Cary Xu 已提交
327
      } else {
C
Cary Xu 已提交
328
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
329
      }
330 331 332 333
    } else {
      ASSERT(0);
    }
  }
334
  return 0;
335 336 337 338 339
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
340
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
341 342 343 344 345 346 347 348 349 350 351 352
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
353
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
354 355 356 357 358 359
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
360
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
361 362 363 364 365 366 367 368 369 370 371 372 373 374
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
S
Shengliang Guan 已提交
375
      smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
376 377 378 379 380 381 382 383 384 385
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
    char rname[TSDB_FILENAME_LEN] = {0};

C
Cary Xu 已提交
386
    if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
387 388 389 390 391 392 393 394 395 396
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }

    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
397
};