smaCommit.c 11.2 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

18 19
extern SSmaMgmt smaMgmt;

C
Cary Xu 已提交
20
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
H
Hongze Cheng 已提交
21
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo);
C
Cary Xu 已提交
22
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
23
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
C
Cary Xu 已提交
24

C
Cary Xu 已提交
25
/**
C
Cary Xu 已提交
26
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
27 28 29 30
 *
 * @param pSma
 * @return int32_t
 */
K
kailixu 已提交
31
int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
C
Cary Xu 已提交
32

C
Cary Xu 已提交
33
/**
C
Cary Xu 已提交
34
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
35 36 37 38
 *
 * @param pSma
 * @return int32_t
 */
H
Hongze Cheng 已提交
39
int32_t smaCommit(SSma *pSma, SCommitInfo *pInfo) { return tdProcessRSmaAsyncCommitImpl(pSma, pInfo); }
C
Cary Xu 已提交
40

C
Cary Xu 已提交
41
/**
C
Cary Xu 已提交
42
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
43 44 45 46
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
47
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
48

C
Cary Xu 已提交
49
/**
C
Cary Xu 已提交
50
 * @brief prepare rsma1/2, and set rsma trigger stat active
C
Cary Xu 已提交
51 52 53 54 55
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaBegin(SSma *pSma) {
C
Cary Xu 已提交
56 57 58 59 60
  int32_t code = 0;
  SVnode *pVnode = pSma->pVnode;

  if ((code = tsdbBegin(VND_RSMA1(pVnode))) < 0) {
    smaError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(code));
C
Cary Xu 已提交
61
    goto _exit;
C
Cary Xu 已提交
62 63 64 65
  }

  if ((code = tsdbBegin(VND_RSMA2(pVnode))) < 0) {
    smaError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(code));
C
Cary Xu 已提交
66
    goto _exit;
C
Cary Xu 已提交
67 68 69
  }

  // set trigger stat
C
Cary Xu 已提交
70 71
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
C
Cary Xu 已提交
72
    goto _exit;
C
Cary Xu 已提交
73
  }
C
Cary Xu 已提交
74
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
75
  int8_t     rsmaTriggerStat =
C
Cary Xu 已提交
76 77 78
      atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
  switch (rsmaTriggerStat) {
    case TASK_TRIGGER_STAT_PAUSED: {
C
Cary Xu 已提交
79
      smaDebug("vgId:%d, rsma trigger stat from paused to active", TD_VID(pVnode));
C
Cary Xu 已提交
80 81 82 83
      break;
    }
    case TASK_TRIGGER_STAT_INIT: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
84
      smaDebug("vgId:%d, rsma trigger stat from init to active", TD_VID(pVnode));
C
Cary Xu 已提交
85 86 87 88
      break;
    }
    default: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
89
      smaWarn("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", TD_VID(pVnode), rsmaTriggerStat);
C
Cary Xu 已提交
90 91 92
      break;
    }
  }
C
Cary Xu 已提交
93 94
_exit:
  terrno = code;
C
Cary Xu 已提交
95
  return code;
C
Cary Xu 已提交
96 97
}

C
Cary Xu 已提交
98
int32_t smaFinishCommit(SSma *pSma) {
H
Hongze Cheng 已提交
99
  int32_t code = 0;
K
kailixu 已提交
100
  int32_t lino = 0;
H
Hongze Cheng 已提交
101
  SVnode *pVnode = pSma->pVnode;
C
Cary Xu 已提交
102

K
kailixu 已提交
103 104 105
  code = tdRSmaFSFinishCommit(pSma);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
106
  if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
K
kailixu 已提交
107
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
108
  }
C
Cary Xu 已提交
109
  if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
K
kailixu 已提交
110
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
111 112
  }
_exit:
K
kailixu 已提交
113 114 115
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  }
C
Cary Xu 已提交
116 117 118
  return code;
}

C
Cary Xu 已提交
119 120 121 122 123 124 125
// SQTaskFile ======================================================

/**
 * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be
 * multiple qtaskinfo files supporting snapshot replication.
 *
 * @param pSma
126
 * @param pStat
C
Cary Xu 已提交
127 128
 * @return int32_t
 */
129
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
K
kailixu 已提交
130
#if 0
131
  SVnode  *pVnode = pSma->pVnode;
132
  SRSmaFS *pFS = RSMA_FS(pStat);
133
  int64_t  committed = pStat->commitAppliedVer;
134
  int64_t  fsMaxVer = -1;
135 136
  char     qTaskInfoFullName[TSDB_FILENAME_LEN];

137 138
  taosWLockLatch(RSMA_FS_LOCK(pStat));

139 140
  for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) {
    SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i);
C
Cary Xu 已提交
141 142
    int32_t     oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1);
    if ((oldVal <= 1) && (pTaskF->version < committed)) {
143 144
      tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
      if (taosRemoveFile(qTaskInfoFullName) < 0) {
C
Cary Xu 已提交
145 146
        smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed,
                qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno)));
147
      } else {
C
Cary Xu 已提交
148 149
        smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed,
                 qTaskInfoFullName);
150 151 152 153 154 155
      }
      taosArrayRemove(pFS->aQTaskInf, i);
      continue;
    }
    ++i;
  }
C
Cary Xu 已提交
156

157 158 159 160 161
  if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
    fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
  }

  if (fsMaxVer < committed) {
C
Cary Xu 已提交
162 163 164
    tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName);
    if (taosCheckExistFile(qTaskInfoFullName)) {
      SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
C
Cary Xu 已提交
165
      if (!taosArrayPush(pFS->aQTaskInf, &qFile)) {
C
Cary Xu 已提交
166 167 168 169
        taosWUnLockLatch(RSMA_FS_LOCK(pStat));
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
170 171 172
    }
  } else {
    smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
C
Cary Xu 已提交
173
             committed, fsMaxVer);
174
  }
C
Cary Xu 已提交
175

176
  taosWUnLockLatch(RSMA_FS_LOCK(pStat));
K
kailixu 已提交
177
#endif
C
Cary Xu 已提交
178 179 180
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
181
/**
C
Cary Xu 已提交
182
 * @brief Rsma async commit implementation(only do some necessary light weighted task)
C
Cary Xu 已提交
183 184
 *  1) set rsma stat TASK_TRIGGER_STAT_PAUSED
 *  2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
C
Cary Xu 已提交
185 186 187 188
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
189
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
190 191
  int32_t  code = 0;

C
Cary Xu 已提交
192 193
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
194
    return code;
C
Cary Xu 已提交
195 196
  }

C
Cary Xu 已提交
197
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
198
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
199
  int32_t    nLoops = 0;
C
Cary Xu 已提交
200

C
Cary Xu 已提交
201
  // step 1: set rsma stat
C
Cary Xu 已提交
202
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
203 204 205 206 207 208 209
  while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
210 211
  pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
  ASSERT(pRSmaStat->commitAppliedVer > 0);
C
Cary Xu 已提交
212

C
Cary Xu 已提交
213
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
214
  nLoops = 0;
C
Cary Xu 已提交
215 216
  while (1) {
    if (T_REF_VAL_GET(pStat) == 0) {
C
Cary Xu 已提交
217
      smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
C
Cary Xu 已提交
218 219
      break;
    } else {
C
Cary Xu 已提交
220
      smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma));
C
Cary Xu 已提交
221 222 223 224 225 226 227 228
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

C
Cary Xu 已提交
229
  /**
C
Cary Xu 已提交
230
   * @brief step 3: commit should wait for all SubmitReq in buffer be consumed
C
Cary Xu 已提交
231 232 233
   *  1) This is high cost task and should not put in asyncPreCommit originally.
   *  2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
   */
C
Cary Xu 已提交
234 235
  smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
          (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
236 237 238 239
  nLoops = 0;
  while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
    ++nLoops;
    if (nLoops > 1000) {
C
Cary Xu 已提交
240 241 242 243
      sched_yield();
      nLoops = 0;
    }
  }
C
Cary Xu 已提交
244
  smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
245 246
  if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) {
    return code;
C
Cary Xu 已提交
247
  }
248
  smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
249

C
Cary Xu 已提交
250
#if 0  // consuming task of qTaskInfo clone 
C
Cary Xu 已提交
251
  // step 4:  swap queue/qall and iQueue/iQall
C
Cary Xu 已提交
252
  // lock
253
  taosWLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
254

C
Cary Xu 已提交
255 256
  ASSERT(RSMA_INFO_HASH(pRSmaStat));

C
Cary Xu 已提交
257
  void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
C
Cary Xu 已提交
258

C
Cary Xu 已提交
259 260 261 262 263 264 265
  while (pIter) {
    SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
    TSWAP(pInfo->iQall, pInfo->qall);
    TSWAP(pInfo->iQueue, pInfo->queue);
    TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
    TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
    pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
C
Cary Xu 已提交
266 267
  }

C
Cary Xu 已提交
268
  // unlock
269
  taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
270
#endif
C
Cary Xu 已提交
271

K
kailixu 已提交
272 273 274 275 276
  // all rsma results are written completely
  STsdb *pTsdb = NULL;
  if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb);
  if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb);

277
  return code;
C
Cary Xu 已提交
278 279 280 281 282 283 284 285
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
H
Hongze Cheng 已提交
286 287
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
  int32_t code = 0;
K
kailixu 已提交
288
  int32_t lino = 0;
H
Hongze Cheng 已提交
289
  SVnode *pVnode = pSma->pVnode;
C
Cary Xu 已提交
290

K
kailixu 已提交
291 292
  code = tdRSmaFSCommit(pSma);
  TSDB_CHECK_CODE(code, lino, _exit);
K
kailixu 已提交
293

K
kailixu 已提交
294 295 296 297 298
  code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbCommit(VND_RSMA2(pVnode), pInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
299

C
Cary Xu 已提交
300
_exit:
K
kailixu 已提交
301 302 303
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  }
C
Cary Xu 已提交
304
  return code;
C
Cary Xu 已提交
305 306 307
}

/**
C
Cary Xu 已提交
308
 * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
C
Cary Xu 已提交
309 310 311
 *
 * @param pSma
 * @return int32_t
C
Cary Xu 已提交
312 313
 */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
314 315
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
316 317 318
    return TSDB_CODE_SUCCESS;
  }

319
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
320

C
Cary Xu 已提交
321
  // step 1: merge qTaskInfo and iQTaskInfo
C
Cary Xu 已提交
322
  // lock
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
  if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) {
    taosWLockLatch(SMA_ENV_LOCK(pEnv));

    void *pIter = NULL;
    while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
      tb_uid_t  *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
        int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
        if (refVal == 0) {
          taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
        } else {
          smaDebug(
              "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
              "table:%" PRIi64,
              SMA_VID(pSma), refVal, *pSuid);
        }

        continue;
C
Cary Xu 已提交
342
      }
C
Cary Xu 已提交
343
#if 0
C
Cary Xu 已提交
344 345 346
    if (pRSmaInfo->taskInfo[0]) {
      if (pRSmaInfo->iTaskInfo[0]) {
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
C
Cary Xu 已提交
347
        tdFreeRSmaInfo(pSma, pRSmaInfo, false);
C
Cary Xu 已提交
348
        pRSmaInfo->iTaskInfo[0] = NULL;
C
Cary Xu 已提交
349 350
      }
    } else {
C
Cary Xu 已提交
351
      TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
C
Cary Xu 已提交
352
    }
C
Cary Xu 已提交
353

C
Cary Xu 已提交
354 355
    taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
    smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
C
Cary Xu 已提交
356
#endif
357
    }
C
Cary Xu 已提交
358

359 360 361
    // unlock
    taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
  }
C
Cary Xu 已提交
362

363
  tdUpdateQTaskInfoFiles(pSma, pRSmaStat);
C
Cary Xu 已提交
364

C
Cary Xu 已提交
365 366
  atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

C
Cary Xu 已提交
367 368
  return TSDB_CODE_SUCCESS;
}