smaEnv.c 10.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

#define RSMA_TASK_INFO_HASH_SLOT 8
C
Cary Xu 已提交
21
#define SMA_MGMT_REF_NUM         10240
22 23

extern SSmaMgmt smaMgmt;
24 25 26

// declaration of static functions

C
Cary Xu 已提交
27
static int32_t  tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
C
Cary Xu 已提交
28 29
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path);
static int32_t  tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv);
C
Cary Xu 已提交
30
static void    *tdFreeTSmaStat(STSmaStat *pStat);
31
static void     tdDestroyRSmaStat(void *pRSmaStat);
32

C
Cary Xu 已提交
33 34 35 36 37
/**
 * @brief rsma init
 *
 * @return int32_t
 */
38
// implementation
C
Cary Xu 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
52
    // init tref rset
C
Cary Xu 已提交
53 54 55
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
56
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
57
      smaError("failed to init sma rset since %s", terrstr());
58 59 60 61 62 63 64
      return TSDB_CODE_FAILED;
    }

    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
C
Cary Xu 已提交
65
      atomic_store_8(&smaMgmt.inited, 0);
66
      smaError("failed to init sma tmr hanle since %s", terrstr());
C
Cary Xu 已提交
67 68 69 70
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
71
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
95 96
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
97 98 99
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
100

C
Cary Xu 已提交
101
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  SMA_ENV_TYPE(pEnv) = smaType;

  int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
119
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
120 121 122 123 124 125 126
    tdFreeSmaEnv(pEnv);
    return NULL;
  }

  return pEnv;
}

C
Cary Xu 已提交
127
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) {
128 129 130 131 132 133
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (!(*pEnv)) {
C
Cary Xu 已提交
134
    if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) {
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
150
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
151 152 153 154 155
    taosThreadRwlockDestroy(&(pSmaEnv->lock));
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
156 157 158 159
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
160 161 162 163 164 165 166
  return NULL;
}

int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_INC(pStat);
S
Shengliang Guan 已提交
167
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
168 169 170 171 172 173 174
  return 0;
}

int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
  if (!pStat) return 0;

  int ref = T_REF_DEC(pStat);
S
Shengliang Guan 已提交
175
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
176 177 178
  return 0;
}

C
Cary Xu 已提交
179
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
180 181 182 183 184 185 186
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
187
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
188 189 190 191 192 193 194 195 196 197 198
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
199 200
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
201
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
202 203

      // init smaMgmt
C
Cary Xu 已提交
204
      smaInit();
205

C
Cary Xu 已提交
206
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
207
      if (refId < 0) {
C
Cary Xu 已提交
208 209
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
210
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
211 212 213
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
214 215 216
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
217
      // init hash
C
Cary Xu 已提交
218
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
219
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
220
      if (!RSMA_INFO_HASH(pRSmaStat)) {
221 222 223 224
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
225
      // TODO
226 227 228 229 230 231 232
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
233
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
234
  if (pStat) {
C
Cary Xu 已提交
235
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
236 237
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
238 239 240 241 242 243 244 245 246 247
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

248 249 250
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
251 252 253
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
254 255
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);

256
    // step 2: destroy the rsma info and associated fetch tasks
C
Cary Xu 已提交
257
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
L
Liu Jicong 已提交
258
#if 1
259 260 261 262
    if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
      void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
      while (infoHash) {
        SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash;
263
        tdFreeRSmaInfo(pSma, pSmaInfo);
264 265
        infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
      }
C
Cary Xu 已提交
266
    }
L
Liu Jicong 已提交
267
#endif
C
Cary Xu 已提交
268 269
    taosHashCleanup(RSMA_INFO_HASH(pStat));

270 271
    // step 3: wait all triggered fetch tasks finished
    int32_t nLoops = 0;
C
Cary Xu 已提交
272 273
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
274
        smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
C
Cary Xu 已提交
275
        break;
276
      } else {
C
Cary Xu 已提交
277
        smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
278 279 280 281 282 283
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
284
    }
285
  }
286 287
}

C
Cary Xu 已提交
288
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
289
  tdDestroySmaState(pSmaStat, smaType);
290 291 292 293 294
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
295 296 297
  return NULL;
}

298 299 300 301 302 303
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
304

305 306 307
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
308
      tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
309
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
310
      SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
C
Cary Xu 已提交
311 312 313 314 315 316 317
      if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma),
                 RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr());
        ASSERT(0);
      } else {
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma),
                 RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId);
318
      }
319 320 321 322
    } else {
      ASSERT(0);
    }
  }
323
  return 0;
324 325 326 327 328
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
329
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
330 331 332 333 334 335 336 337 338 339 340 341
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
342
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
343 344 345 346 347 348
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
349
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
350 351 352 353 354 355 356 357 358 359 360 361 362 363
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
C
Cary Xu 已提交
364
      smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType);
365 366 367 368 369 370 371 372 373 374
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
    char rname[TSDB_FILENAME_LEN] = {0};

C
Cary Xu 已提交
375
    if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) {
376 377 378 379 380 381 382 383 384 385
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }

    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), pEnv)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), pEnv);
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
386
};