smaCommit.c 12.5 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

18 19
extern SSmaMgmt smaMgmt;

C
Cary Xu 已提交
20 21 22
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
C
Cary Xu 已提交
23
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
C
Cary Xu 已提交
24
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
C
Cary Xu 已提交
25
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
26
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
C
Cary Xu 已提交
27 28 29 30 31 32 33

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
34
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
C
Cary Xu 已提交
35 36 37 38 39 40 41

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
42
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
C
Cary Xu 已提交
43 44 45 46 47 48 49

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
50
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
51

C
Cary Xu 已提交
52 53 54 55 56 57
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
58 59
int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }

C
Cary Xu 已提交
60 61 62 63 64 65 66 67
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }

C
Cary Xu 已提交
68 69 70 71 72 73 74
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
75

C
Cary Xu 已提交
76 77 78 79 80 81 82 83 84 85 86 87
/**
 * @brief set rsma trigger stat active
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaBegin(SSma *pSma) {
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
88
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
89 90 91 92 93

  int8_t rsmaTriggerStat =
      atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
  switch (rsmaTriggerStat) {
    case TASK_TRIGGER_STAT_PAUSED: {
C
Cary Xu 已提交
94
      smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma));
C
Cary Xu 已提交
95 96 97 98
      break;
    }
    case TASK_TRIGGER_STAT_INIT: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
99
      smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma));
C
Cary Xu 已提交
100 101 102 103
      break;
    }
    default: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
104
      smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
C
Cary Xu 已提交
105 106 107 108 109 110
      break;
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
111
/**
C
Cary Xu 已提交
112
 * @brief pre-commit for rollup sma(sync commit).
C
Cary Xu 已提交
113
 *  1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
C
Cary Xu 已提交
114
 *  2) wait for all triggered fetch tasks to finish
115
 *  3) perform persist task for qTaskInfo
C
Cary Xu 已提交
116 117 118 119
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
120
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
121 122 123 124 125
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
126
  SSmaStat  *pStat = SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
127
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
128

C
Cary Xu 已提交
129
  // step 1: set rsma stat paused
C
Cary Xu 已提交
130 131
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);

C
Cary Xu 已提交
132
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
133 134 135
  int32_t nLoops = 0;
  while (1) {
    if (T_REF_VAL_GET(pStat) == 0) {
C
Cary Xu 已提交
136
      smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
137 138
      break;
    } else {
C
Cary Xu 已提交
139
      smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
140 141 142 143 144 145 146
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
147

148
  // step 3: perform persist task for qTaskInfo
C
Cary Xu 已提交
149 150
  pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
  tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
151

C
Cary Xu 已提交
152
  smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
C
Cary Xu 已提交
153

C
Cary Xu 已提交
154 155 156 157 158 159 160 161 162
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
163
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
164 165 166 167 168 169 170
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
171 172 173 174 175 176 177
// SQTaskFile ======================================================

/**
 * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
 * multiple qtaskinfo files supporting snapshot replication.
 *
 * @param pSma
178
 * @param pStat
C
Cary Xu 已提交
179 180
 * @return int32_t
 */
181 182
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
  SVnode  *pVnode = pSma->pVnode;
183
  SRSmaFS *pFS = RSMA_FS(pStat);
184
  int64_t  committed = pStat->commitAppliedVer;
185
  int64_t  fsMaxVer = -1;
186 187
  char     qTaskInfoFullName[TSDB_FILENAME_LEN];

188 189
  taosWLockLatch(RSMA_FS_LOCK(pStat));

190 191
  for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
    SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
C
Cary Xu 已提交
192 193
    int32_t     oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1);
    if ((oldVal <= 1) && (pTaskF->version < committed)) {
194 195
      tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
      if (taosRemoveFile(qTaskInfoFullName) < 0) {
C
Cary Xu 已提交
196 197
        smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
                qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno)));
198
      } else {
C
Cary Xu 已提交
199 200
        smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
                 qTaskInfoFullName);
201 202 203 204 205 206
      }
      taosArrayRemove(pFS->aQTaskInf, i);
      continue;
    }
    ++i;
  }
C
Cary Xu 已提交
207

208 209 210 211 212
  if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
    fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
  }

  if (fsMaxVer < committed) {
C
Cary Xu 已提交
213 214 215 216 217 218 219 220
    tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
    if (taosCheckExistFile(qTaskInfoFullName)) {
      SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
      if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) {
        taosWUnLockLatch(RSMA_FS_LOCK(pStat));
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
221 222 223
    }
  } else {
    smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
224
             committed, fsMaxVer);
225
  }
C
Cary Xu 已提交
226

227
  taosWUnLockLatch(RSMA_FS_LOCK(pStat));
C
Cary Xu 已提交
228 229 230
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
/**
 * @brief post-commit for rollup sma
 *  1) clean up the outdated qtaskinfo files
 *
 * @param pSma
 * @return int32_t
 */
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
  SVnode *pVnode = pSma->pVnode;
  if (!VND_IS_RSMA(pVnode)) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
244
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
C
Cary Xu 已提交
245

246
  tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
C
Cary Xu 已提交
247

C
Cary Xu 已提交
248 249
  return TSDB_CODE_SUCCESS;
}
C
Cary Xu 已提交
250 251

/**
C
Cary Xu 已提交
252
 * @brief Rsma async commit implementation(only do some necessary light weighted task)
C
Cary Xu 已提交
253 254
 *  1) set rsma stat TASK_TRIGGER_STAT_PAUSED
 *  2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
C
Cary Xu 已提交
255 256 257 258
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
259
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
260 261
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
262 263 264
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
265
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
266
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
267
  int32_t    nLoops = 0;
C
Cary Xu 已提交
268

C
Cary Xu 已提交
269
  // step 1: set rsma stat
C
Cary Xu 已提交
270
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
271 272 273 274 275 276 277
  while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
278 279
  pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
  ASSERT(pRSmaStat->commitAppliedVer > 0);
C
Cary Xu 已提交
280

C
Cary Xu 已提交
281
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
282
  nLoops = 0;
C
Cary Xu 已提交
283 284
  while (1) {
    if (T_REF_VAL_GET(pStat) == 0) {
C
Cary Xu 已提交
285
      smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
286 287
      break;
    } else {
C
Cary Xu 已提交
288
      smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
289 290 291 292 293 294 295 296
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

C
Cary Xu 已提交
297 298 299 300 301
  /**
   * @brief step 3: consume the SubmitReq in buffer
   *  1) This is high cost task and should not put in asyncPreCommit originally.
   *  2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
   */
C
Cary Xu 已提交
302 303 304
  if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
305

C
Cary Xu 已提交
306 307
  smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
          (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
308 309 310 311
  nLoops = 0;
  while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
    ++nLoops;
    if (nLoops > 1000) {
C
Cary Xu 已提交
312 313 314 315
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
316 317 318 319
  smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
  if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
    return TSDB_CODE_FAILED;
  }
320
  smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
321

C
Cary Xu 已提交
322
#if 0  // consuming task of qTaskInfo clone 
C
Cary Xu 已提交
323
  // step 4:  swap queue/qall and iQueue/iQall
C
Cary Xu 已提交
324
  // lock
325
  taosWLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
326

C
Cary Xu 已提交
327 328
  ASSERT(RSMA_INFO_HASH(pRSmaStat));

C
Cary Xu 已提交
329
  void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
C
Cary Xu 已提交
330

C
Cary Xu 已提交
331 332 333 334 335 336 337
  while (pIter) {
    SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
    TSWAP(pInfo->iQall, pInfo->qall);
    TSWAP(pInfo->iQueue, pInfo->queue);
    TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
    TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
    pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
C
Cary Xu 已提交
338 339
  }

C
Cary Xu 已提交
340
  // unlock
341
  taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
342
#endif
C
Cary Xu 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }
C
Cary Xu 已提交
358
#if 0
C
Cary Xu 已提交
359
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
360

C
Cary Xu 已提交
361
  // perform persist task for qTaskInfo operator
C
Cary Xu 已提交
362 363 364
  if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
365
#endif
C
Cary Xu 已提交
366

C
Cary Xu 已提交
367 368 369 370
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
371
 * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
C
Cary Xu 已提交
372 373 374
 *
 * @param pSma
 * @return int32_t
C
Cary Xu 已提交
375 376
 */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
377 378
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
379 380 381
    return TSDB_CODE_SUCCESS;
  }

382
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
383

C
Cary Xu 已提交
384
  // step 1: merge qTaskInfo and iQTaskInfo
C
Cary Xu 已提交
385
  // lock
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) {
    taosWLockLatch(SMA_ENV_LOCK(pEnv));

    void *pIter = NULL;
    while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
      tb_uid_t  *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
        int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
        if (refVal == 0) {
          taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
        } else {
          smaDebug(
              "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
              "table:%" PRIi64,
              SMA_VID(pSma), refVal, *pSuid);
        }

        continue;
C
Cary Xu 已提交
405
      }
C
Cary Xu 已提交
406
#if 0
C
Cary Xu 已提交
407 408 409
    if (pRSmaInfo->taskInfo[0]) {
      if (pRSmaInfo->iTaskInfo[0]) {
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
C
Cary Xu 已提交
410
        tdFreeRSmaInfo(pSma, pRSmaInfo, false);
C
Cary Xu 已提交
411
        pRSmaInfo->iTaskInfo[0] = NULL;
C
Cary Xu 已提交
412 413
      }
    } else {
C
Cary Xu 已提交
414
      TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
C
Cary Xu 已提交
415
    }
C
Cary Xu 已提交
416

C
Cary Xu 已提交
417 418
    taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
    smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
C
Cary Xu 已提交
419
#endif
420
    }
C
Cary Xu 已提交
421

422 423 424
    // unlock
    taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
  }
C
Cary Xu 已提交
425

426
  tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
C
Cary Xu 已提交
427

C
Cary Xu 已提交
428 429
  atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

C
Cary Xu 已提交
430 431
  return TSDB_CODE_SUCCESS;
}