smaEnv.c 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

typedef struct SSmaStat SSmaStat;

C
Cary Xu 已提交
20
#define SMA_MGMT_REF_NUM 10240
21 22

extern SSmaMgmt smaMgmt;
23 24 25

// declaration of static functions

C
Cary Xu 已提交
26 27 28 29 30
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv);
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma);
static int32_t tdRsmaStartExecutor(const SSma *pSma);
static int32_t tdRsmaStopExecutor(const SSma *pSma);
31 32
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
static void   *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
C
Cary Xu 已提交
33 34
static void   *tdFreeTSmaStat(STSmaStat *pStat);
static void    tdDestroyRSmaStat(void *pRSmaStat);
35

C
Cary Xu 已提交
36 37 38 39 40
/**
 * @brief rsma init
 *
 * @return int32_t
 */
41
// implementation
C
Cary Xu 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54
int32_t smaInit() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 0) {
55
    // init tref rset
C
Cary Xu 已提交
56 57 58
    smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);

    if (smaMgmt.rsetId < 0) {
59
      atomic_store_8(&smaMgmt.inited, 0);
C
Cary Xu 已提交
60
      smaError("failed to init sma rset since %s", terrstr());
61 62 63
      return TSDB_CODE_FAILED;
    }

64
    int32_t type = (8 == POINTER_BYTES) ? TSDB_DATA_TYPE_UBIGINT : TSDB_DATA_TYPE_UINT;
65
    smaMgmt.refHash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_ENTRY_LOCK);
66 67 68 69 70 71 72
    if (!smaMgmt.refHash) {
      taosCloseRef(smaMgmt.rsetId);
      atomic_store_8(&smaMgmt.inited, 0);
      smaError("failed to init sma tmr hanle since %s", terrstr());
      return TSDB_CODE_FAILED;
    }

73 74 75 76
    // init fetch timer handle
    smaMgmt.tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA");
    if (!smaMgmt.tmrHandle) {
      taosCloseRef(smaMgmt.rsetId);
77 78
      taosHashCleanup(smaMgmt.refHash);
      smaMgmt.refHash = NULL;
C
Cary Xu 已提交
79
      atomic_store_8(&smaMgmt.inited, 0);
80
      smaError("failed to init sma tmr handle since %s", terrstr());
C
Cary Xu 已提交
81 82 83 84
      return TSDB_CODE_FAILED;
    }

    atomic_store_8(&smaMgmt.inited, 1);
85
    smaInfo("sma mgmt env is initialized, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief rsma cleanup
 *
 */
void smaCleanUp() {
  int8_t  old;
  int32_t nLoops = 0;
  while (1) {
    old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
    if (old != 2) break;
    if (++nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

  if (old == 1) {
    taosCloseRef(smaMgmt.rsetId);
109
    taosHashCleanup(smaMgmt.refHash);
110
    smaMgmt.refHash = NULL;
111 112
    taosTmrCleanUp(smaMgmt.tmrHandle);
    smaInfo("sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p", smaMgmt.rsetId, smaMgmt.tmrHandle);
C
Cary Xu 已提交
113 114 115
    atomic_store_8(&smaMgmt.inited, 0);
  }
}
116

C
Cary Xu 已提交
117
static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
118 119 120
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
C
Cary Xu 已提交
121
  *ppEnv = pEnv;
122 123
  if (!pEnv) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
C
Cary Xu 已提交
124
    return TSDB_CODE_FAILED;
125 126 127 128
  }

  SMA_ENV_TYPE(pEnv) = smaType;

C
Cary Xu 已提交
129
  taosInitRWLatch(&(pEnv->lock));
130

C
Cary Xu 已提交
131 132 133
  (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv)
                                        : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv);

C
Cary Xu 已提交
134
  if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) {
135
    tdFreeSmaEnv(pEnv);
C
Cary Xu 已提交
136 137 138 139
    *ppEnv = NULL;
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL)
                                          : atomic_store_ptr(&SMA_RSMA_ENV(pSma), NULL);
    return TSDB_CODE_FAILED;
140 141
  }

C
Cary Xu 已提交
142
  return TSDB_CODE_SUCCESS;
143 144
}

C
Cary Xu 已提交
145 146
static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) {
  if (!ppEnv) {
147 148 149 150
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
151 152
  if (!(*ppEnv)) {
    if (tdNewSmaEnv(pSma, smaType, ppEnv) != TSDB_CODE_SUCCESS) {
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
168
    pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
169 170 171 172
  }
}

void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) {
C
Cary Xu 已提交
173 174 175 176
  if (pSmaEnv) {
    tdDestroySmaEnv(pSmaEnv);
    taosMemoryFreeClear(pSmaEnv);
  }
177 178 179
  return NULL;
}

180 181 182 183 184 185 186 187 188 189 190
static void tRSmaInfoHashFreeNode(void *data) {
  SRSmaInfo     *pRSmaInfo = NULL;
  SRSmaInfoItem *pItem = NULL;

  if ((pRSmaInfo = *(SRSmaInfo **)data)) {
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 0)) && pItem->level) {
      taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
    }
    if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
      taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
    }
C
Cary Xu 已提交
191
    tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true);
192 193 194
  }
}

C
Cary Xu 已提交
195
static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) {
196 197 198 199 200 201 202
  ASSERT(pSmaStat != NULL);

  if (*pSmaStat) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  /**
C
Cary Xu 已提交
203
   *  1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
204 205 206 207
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tdInitSmaStat invoked in other multithread environment later.
   */
  if (!(*pSmaStat)) {
C
Cary Xu 已提交
208
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads);
209 210 211 212 213 214
    if (!(*pSmaStat)) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
215 216
      SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat);
      pRSmaStat->pSma = (SSma *)pSma;
C
Cary Xu 已提交
217
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
C
Cary Xu 已提交
218
      tsem_init(&pRSmaStat->notEmpty, 0, 0);
219 220

      // init smaMgmt
C
Cary Xu 已提交
221
      smaInit();
222

C
Cary Xu 已提交
223
      int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
224
      if (refId < 0) {
C
Cary Xu 已提交
225 226
        smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
                 refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
227
        return TSDB_CODE_FAILED;
C
Cary Xu 已提交
228 229 230
      } else {
        smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
                 smaMgmt.rsetId, SMA_MGMT_REF_NUM);
231 232 233
      }
      pRSmaStat->refId = refId;

C
Cary Xu 已提交
234
      // init hash
C
Cary Xu 已提交
235
      RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
C
Cary Xu 已提交
236
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
237
      if (!RSMA_INFO_HASH(pRSmaStat)) {
C
Cary Xu 已提交
238 239
        return TSDB_CODE_FAILED;
      }
240
      taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode);
C
Cary Xu 已提交
241 242 243 244

      if (tdRsmaStartExecutor(pSma) < 0) {
        return TSDB_CODE_FAILED;
      }
245

246
      if (!(RSMA_FS(pRSmaStat)->aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) {
247 248 249
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
250
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
251
      // TODO
252 253 254 255 256 257 258
    } else {
      ASSERT(0);
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
259
static void tdDestroyTSmaStat(STSmaStat *pStat) {
C
Cary Xu 已提交
260
  if (pStat) {
C
Cary Xu 已提交
261
    smaDebug("destroy tsma stat");
C
Cary Xu 已提交
262 263
    tDestroyTSma(pStat->pTSma);
    taosMemoryFreeClear(pStat->pTSma);
C
Cary Xu 已提交
264 265 266 267 268 269 270 271 272 273
    taosMemoryFreeClear(pStat->pTSchema);
  }
}

static void *tdFreeTSmaStat(STSmaStat *pStat) {
  tdDestroyTSmaStat(pStat);
  taosMemoryFreeClear(pStat);
  return NULL;
}

274 275 276
static void tdDestroyRSmaStat(void *pRSmaStat) {
  if (pRSmaStat) {
    SRSmaStat *pStat = (SRSmaStat *)pRSmaStat;
C
Cary Xu 已提交
277 278 279
    SSma      *pSma = pStat->pSma;
    smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat);
    // step 1: set rsma trigger stat cancelled
C
Cary Xu 已提交
280
    atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
C
Cary Xu 已提交
281
    tsem_destroy(&(pStat->notEmpty));
C
Cary Xu 已提交
282

283
    // step 2: destroy the rsma info and associated fetch tasks
C
Cary Xu 已提交
284 285
    taosHashCleanup(RSMA_INFO_HASH(pStat));

C
Cary Xu 已提交
286
    // step 3: wait for all triggered fetch tasks to finish
287
    int32_t nLoops = 0;
C
Cary Xu 已提交
288 289
    while (1) {
      if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
C
Cary Xu 已提交
290
        smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
291
        break;
292
      } else {
C
Cary Xu 已提交
293
        smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
294 295 296 297 298 299
      }
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
C
Cary Xu 已提交
300
    }
C
Cary Xu 已提交
301

C
Cary Xu 已提交
302 303 304
    // step 4:
    tdRsmaStopExecutor(pSma);

305
    // step 5:
306
    tdRSmaFSClose(RSMA_FS(pStat));
307 308

    // step 6: free pStat
C
Cary Xu 已提交
309
    taosMemoryFreeClear(pStat);
310
  }
311 312
}

313
static void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
C
Cary Xu 已提交
314
  tdDestroySmaState(pSmaStat, smaType);
315 316 317 318 319
  if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
    taosMemoryFreeClear(pSmaStat);
  }
  // tref used to free rsma stat

C
Cary Xu 已提交
320 321 322
  return NULL;
}

323 324 325 326 327 328
/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaStat
 * @return int32_t
 */
329

330
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
331 332
  if (pSmaStat) {
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
333
      tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
334
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
335
      SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
C
Cary Xu 已提交
336 337
      int32_t    vid = SMA_VID(pRSmaStat->pSma);
      int64_t    refId = RSMA_REF_ID(pRSmaStat);
338
      if (taosRemoveRef(smaMgmt.rsetId, refId) < 0) {
C
Cary Xu 已提交
339 340
        smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", vid, refId,
                 smaMgmt.rsetId, terrstr());
C
Cary Xu 已提交
341
      } else {
C
Cary Xu 已提交
342
        smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId);
343
      }
344 345 346 347
    } else {
      ASSERT(0);
    }
  }
348
  return 0;
349 350 351 352 353
}

int32_t tdLockSma(SSma *pSma) {
  int code = taosThreadMutexLock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
354
    smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno));
355 356 357 358 359 360 361 362 363 364 365 366
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  pSma->locked = true;
  return 0;
}

int32_t tdUnLockSma(SSma *pSma) {
  ASSERT(SMA_LOCKED(pSma));
  pSma->locked = false;
  int code = taosThreadMutexUnlock(&pSma->mutex);
  if (code != 0) {
S
Shengliang Guan 已提交
367
    smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno));
368 369 370 371 372 373
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

C
Cary Xu 已提交
374
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
375 376 377 378 379 380 381 382 383 384 385 386 387 388
  SSmaEnv *pEnv = NULL;

  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_TSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&SMA_RSMA_ENV(pSma)))) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
S
Shengliang Guan 已提交
389
      smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType);
390 391 392 393 394 395 396 397
      return TSDB_CODE_FAILED;
  }

  // init sma env
  tdLockSma(pSma);
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&SMA_TSMA_ENV(pSma))
                                               : atomic_load_ptr(&SMA_RSMA_ENV(pSma));
  if (!pEnv) {
C
Cary Xu 已提交
398
    if (tdInitSmaEnv(pSma, smaType, &pEnv) < 0) {
399 400 401 402 403 404 405
      tdUnLockSma(pSma);
      return TSDB_CODE_FAILED;
    }
  }
  tdUnLockSma(pSma);

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
406
};
C
Cary Xu 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461

void *tdRSmaExecutorFunc(void *param) {
  setThreadName("vnode-rsma");

  tdRSmaProcessExecImpl((SSma *)param, RSMA_EXEC_OVERFLOW);
  return NULL;
}

static int32_t tdRsmaStartExecutor(const SSma *pSma) {
  TdThreadAttr thAttr = {0};
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  SSmaEnv  *pEnv = SMA_RSMA_ENV(pSma);
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  TdThread *pthread = (TdThread *)&pStat->data;

  for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
    if (taosThreadCreate(&pthread[i], &thAttr, tdRSmaExecutorFunc, (void *)pSma) != 0) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      smaError("vgId:%d, failed to create pthread for rsma since %s", SMA_VID(pSma), terrstr());
      return -1;
    }
    smaDebug("vgId:%d, success to create pthread for rsma", SMA_VID(pSma));
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

static int32_t tdRsmaStopExecutor(const SSma *pSma) {
  if (pSma && VND_IS_RSMA(pSma->pVnode)) {
    SSmaEnv   *pEnv = NULL;
    SSmaStat  *pStat = NULL;
    SRSmaStat *pRSmaStat = NULL;
    TdThread  *pthread = NULL;

    if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = SMA_ENV_STAT(pEnv))) {
      return 0;
    }

    pEnv->flag |= SMA_ENV_FLG_CLOSE;
    pRSmaStat = (SRSmaStat *)pStat;
    pthread = (TdThread *)&pStat->data;

    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
      tsem_post(&(pRSmaStat->notEmpty));
    }

    for (int32_t i = 0; i < tsNumOfVnodeRsmaThreads; ++i) {
      if (taosCheckPthreadValid(pthread[i])) {
        smaDebug("vgId:%d, start to join pthread for rsma:%" PRId64, SMA_VID(pSma), pthread[i]);
        taosThreadJoin(pthread[i], NULL);
      }
    }
462 463

    smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads);
C
Cary Xu 已提交
464 465 466
  }
  return 0;
}