smaCommit.c 12.6 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

18 19
extern SSmaMgmt smaMgmt;

C
Cary Xu 已提交
20 21 22
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
C
Cary Xu 已提交
23
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
C
Cary Xu 已提交
24
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma);
C
Cary Xu 已提交
25
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
26
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
C
Cary Xu 已提交
27 28 29 30 31 32 33

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
34
int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); }
C
Cary Xu 已提交
35 36 37 38 39 40 41

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
42
int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); }
C
Cary Xu 已提交
43 44 45 46 47 48 49

/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
50
int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
51

C
Cary Xu 已提交
52 53 54 55 56 57
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
58 59
int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }

C
Cary Xu 已提交
60 61 62 63 64 65 66 67
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }

C
Cary Xu 已提交
68 69 70 71 72 73 74
/**
 * @brief Only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
75

C
Cary Xu 已提交
76 77 78 79 80 81 82 83 84 85 86 87
/**
 * @brief set rsma trigger stat active
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaBegin(SSma *pSma) {
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
88
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
89 90 91 92 93

  int8_t rsmaTriggerStat =
      atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
  switch (rsmaTriggerStat) {
    case TASK_TRIGGER_STAT_PAUSED: {
C
Cary Xu 已提交
94
      smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma));
C
Cary Xu 已提交
95 96 97 98
      break;
    }
    case TASK_TRIGGER_STAT_INIT: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
99
      smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma));
C
Cary Xu 已提交
100 101 102 103
      break;
    }
    default: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
104
      smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
C
Cary Xu 已提交
105 106 107 108 109 110
      break;
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
111
/**
C
Cary Xu 已提交
112
 * @brief pre-commit for rollup sma(sync commit).
C
Cary Xu 已提交
113
 *  1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
C
Cary Xu 已提交
114
 *  2) wait for all triggered fetch tasks to finish
115
 *  3) perform persist task for qTaskInfo
C
Cary Xu 已提交
116 117 118 119
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
120
static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
121 122 123 124 125
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
126
  SSmaStat  *pStat = SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
127
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
128

C
Cary Xu 已提交
129
  // step 1: set rsma stat paused
C
Cary Xu 已提交
130 131
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);

C
Cary Xu 已提交
132
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
133 134 135
  int32_t nLoops = 0;
  while (1) {
    if (T_REF_VAL_GET(pStat) == 0) {
C
Cary Xu 已提交
136
      smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
137 138
      break;
    } else {
C
Cary Xu 已提交
139
      smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
140 141 142 143 144 145 146
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
147

148
  // step 3: perform persist task for qTaskInfo
C
Cary Xu 已提交
149 150
  pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
  tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
151

C
Cary Xu 已提交
152
  smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
C
Cary Xu 已提交
153

C
Cary Xu 已提交
154 155 156 157 158 159 160 161 162
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
163
static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
164 165 166 167 168 169 170
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
171 172 173 174 175 176 177
// SQTaskFile ======================================================

/**
 * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
 * multiple qtaskinfo files supporting snapshot replication.
 *
 * @param pSma
178
 * @param pStat
C
Cary Xu 已提交
179 180
 * @return int32_t
 */
181 182
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
  SVnode  *pVnode = pSma->pVnode;
183
  SRSmaFS *pFS = RSMA_FS(pStat);
184
  int64_t  committed = pStat->commitAppliedVer;
185
  int64_t  fsMaxVer = -1;
186 187
  char     qTaskInfoFullName[TSDB_FILENAME_LEN];

188 189
  taosWLockLatch(RSMA_FS_LOCK(pStat));

190 191
  for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
    SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
C
Cary Xu 已提交
192 193
    int32_t     oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1);
    if ((oldVal <= 1) && (pTaskF->version < committed)) {
194 195
      tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
      if (taosRemoveFile(qTaskInfoFullName) < 0) {
C
Cary Xu 已提交
196 197
        smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
                qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno)));
198
      } else {
C
Cary Xu 已提交
199 200
        smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
                 qTaskInfoFullName);
201 202 203 204 205 206
      }
      taosArrayRemove(pFS->aQTaskInf, i);
      continue;
    }
    ++i;
  }
C
Cary Xu 已提交
207

208 209 210 211 212
  if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
    fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
  }

  if (fsMaxVer < committed) {
C
Cary Xu 已提交
213 214 215 216 217 218 219 220
    tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
    if (taosCheckExistFile(qTaskInfoFullName)) {
      SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
      if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) {
        taosWUnLockLatch(RSMA_FS_LOCK(pStat));
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
221 222 223
    }
  } else {
    smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
224
             committed, fsMaxVer);
225
  }
C
Cary Xu 已提交
226

227
  taosWUnLockLatch(RSMA_FS_LOCK(pStat));
C
Cary Xu 已提交
228 229 230
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
/**
 * @brief post-commit for rollup sma
 *  1) clean up the outdated qtaskinfo files
 *
 * @param pSma
 * @return int32_t
 */
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
  SVnode *pVnode = pSma->pVnode;
  if (!VND_IS_RSMA(pVnode)) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
244
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
C
Cary Xu 已提交
245

246
  tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
C
Cary Xu 已提交
247

C
Cary Xu 已提交
248 249
  return TSDB_CODE_SUCCESS;
}
C
Cary Xu 已提交
250 251

/**
C
Cary Xu 已提交
252
 * @brief Rsma async commit implementation(only do some necessary light weighted task)
C
Cary Xu 已提交
253 254
 *  1) set rsma stat TASK_TRIGGER_STAT_PAUSED
 *  2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
C
Cary Xu 已提交
255 256 257 258
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
259
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
260 261
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
262 263 264
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
265
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
266
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
267
  int32_t    nLoops = 0;
C
Cary Xu 已提交
268

C
Cary Xu 已提交
269
  // step 1: set rsma stat
C
Cary Xu 已提交
270
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
271 272 273 274 275 276 277
  while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
278 279
  pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
  ASSERT(pRSmaStat->commitAppliedVer > 0);
C
Cary Xu 已提交
280

C
Cary Xu 已提交
281
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
282
  nLoops = 0;
C
Cary Xu 已提交
283 284
  while (1) {
    if (T_REF_VAL_GET(pStat) == 0) {
C
Cary Xu 已提交
285
      smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
286 287
      break;
    } else {
C
Cary Xu 已提交
288
      smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
289 290 291 292 293 294 295 296
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

C
Cary Xu 已提交
297
  /**
C
Cary Xu 已提交
298
   * @brief step 3: commit should wait for all SubmitReq in buffer be consumed
C
Cary Xu 已提交
299 300 301
   *  1) This is high cost task and should not put in asyncPreCommit originally.
   *  2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
   */
C
Cary Xu 已提交
302 303 304 305 306 307 308
  nLoops = 0;
  while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
C
Cary Xu 已提交
309
  }
C
Cary Xu 已提交
310

C
Cary Xu 已提交
311 312
  smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
          (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
313 314 315 316
  nLoops = 0;
  while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
    ++nLoops;
    if (nLoops > 1000) {
C
Cary Xu 已提交
317 318 319 320
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
321 322 323 324
  smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
  if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
    return TSDB_CODE_FAILED;
  }
325
  smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
326

C
Cary Xu 已提交
327
#if 0  // consuming task of qTaskInfo clone 
C
Cary Xu 已提交
328
  // step 4:  swap queue/qall and iQueue/iQall
C
Cary Xu 已提交
329
  // lock
330
  taosWLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
331

C
Cary Xu 已提交
332 333
  ASSERT(RSMA_INFO_HASH(pRSmaStat));

C
Cary Xu 已提交
334
  void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
C
Cary Xu 已提交
335

C
Cary Xu 已提交
336 337 338 339 340 341 342
  while (pIter) {
    SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
    TSWAP(pInfo->iQall, pInfo->qall);
    TSWAP(pInfo->iQueue, pInfo->queue);
    TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
    TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
    pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
C
Cary Xu 已提交
343 344
  }

C
Cary Xu 已提交
345
  // unlock
346
  taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
347
#endif
C
Cary Xu 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }
C
Cary Xu 已提交
363
#if 0
C
Cary Xu 已提交
364
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
365

C
Cary Xu 已提交
366
  // perform persist task for qTaskInfo operator
C
Cary Xu 已提交
367 368 369
  if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
370
#endif
C
Cary Xu 已提交
371

C
Cary Xu 已提交
372 373 374 375
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
376
 * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
C
Cary Xu 已提交
377 378 379
 *
 * @param pSma
 * @return int32_t
C
Cary Xu 已提交
380 381
 */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
382 383
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
384 385 386
    return TSDB_CODE_SUCCESS;
  }

387
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
388

C
Cary Xu 已提交
389
  // step 1: merge qTaskInfo and iQTaskInfo
C
Cary Xu 已提交
390
  // lock
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
  if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) {
    taosWLockLatch(SMA_ENV_LOCK(pEnv));

    void *pIter = NULL;
    while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
      tb_uid_t  *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
        int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
        if (refVal == 0) {
          taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
        } else {
          smaDebug(
              "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
              "table:%" PRIi64,
              SMA_VID(pSma), refVal, *pSuid);
        }

        continue;
C
Cary Xu 已提交
410
      }
C
Cary Xu 已提交
411
#if 0
C
Cary Xu 已提交
412 413 414
    if (pRSmaInfo->taskInfo[0]) {
      if (pRSmaInfo->iTaskInfo[0]) {
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
C
Cary Xu 已提交
415
        tdFreeRSmaInfo(pSma, pRSmaInfo, false);
C
Cary Xu 已提交
416
        pRSmaInfo->iTaskInfo[0] = NULL;
C
Cary Xu 已提交
417 418
      }
    } else {
C
Cary Xu 已提交
419
      TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
C
Cary Xu 已提交
420
    }
C
Cary Xu 已提交
421

C
Cary Xu 已提交
422 423
    taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
    smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
C
Cary Xu 已提交
424
#endif
425
    }
C
Cary Xu 已提交
426

427 428 429
    // unlock
    taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
  }
C
Cary Xu 已提交
430

431
  tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
C
Cary Xu 已提交
432

C
Cary Xu 已提交
433 434
  atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

C
Cary Xu 已提交
435 436
  return TSDB_CODE_SUCCESS;
}