smaCommit.c 9.8 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

18 19
extern SSmaMgmt smaMgmt;

K
kailixu 已提交
20
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit);
H
Hongze Cheng 已提交
21
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo);
C
Cary Xu 已提交
22
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
23
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
C
Cary Xu 已提交
24

K
kailixu 已提交
25 26 27 28 29 30 31 32
/**
 * @brief only applicable to Rollup SMA
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaPreClose(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma, false); }

C
Cary Xu 已提交
33
/**
C
Cary Xu 已提交
34
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
35 36 37 38
 *
 * @param pSma
 * @return int32_t
 */
K
kailixu 已提交
39
int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma, true); }
C
Cary Xu 已提交
40

C
Cary Xu 已提交
41
/**
C
Cary Xu 已提交
42
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
43 44 45 46
 *
 * @param pSma
 * @return int32_t
 */
H
Hongze Cheng 已提交
47
int32_t smaCommit(SSma *pSma, SCommitInfo *pInfo) { return tdProcessRSmaAsyncCommitImpl(pSma, pInfo); }
C
Cary Xu 已提交
48

C
Cary Xu 已提交
49
/**
C
Cary Xu 已提交
50
 * @brief async commit, only applicable to Rollup SMA
C
Cary Xu 已提交
51 52 53 54
 *
 * @param pSma
 * @return int32_t
 */
C
Cary Xu 已提交
55
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
C
Cary Xu 已提交
56

C
Cary Xu 已提交
57
/**
C
Cary Xu 已提交
58
 * @brief prepare rsma1/2, and set rsma trigger stat active
C
Cary Xu 已提交
59 60 61 62 63
 *
 * @param pSma
 * @return int32_t
 */
int32_t smaBegin(SSma *pSma) {
C
Cary Xu 已提交
64 65 66 67 68
  int32_t code = 0;
  SVnode *pVnode = pSma->pVnode;

  if ((code = tsdbBegin(VND_RSMA1(pVnode))) < 0) {
    smaError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(code));
C
Cary Xu 已提交
69
    goto _exit;
C
Cary Xu 已提交
70 71 72 73
  }

  if ((code = tsdbBegin(VND_RSMA2(pVnode))) < 0) {
    smaError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(code));
C
Cary Xu 已提交
74
    goto _exit;
C
Cary Xu 已提交
75 76 77
  }

  // set trigger stat
C
Cary Xu 已提交
78 79
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
C
Cary Xu 已提交
80
    goto _exit;
C
Cary Xu 已提交
81
  }
C
Cary Xu 已提交
82
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
83
  int8_t     rsmaTriggerStat =
C
Cary Xu 已提交
84 85 86
      atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
  switch (rsmaTriggerStat) {
    case TASK_TRIGGER_STAT_PAUSED: {
C
Cary Xu 已提交
87
      smaDebug("vgId:%d, rsma trigger stat from paused to active", TD_VID(pVnode));
C
Cary Xu 已提交
88 89 90 91
      break;
    }
    case TASK_TRIGGER_STAT_INIT: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
92
      smaDebug("vgId:%d, rsma trigger stat from init to active", TD_VID(pVnode));
C
Cary Xu 已提交
93 94 95 96
      break;
    }
    default: {
      atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
C
Cary Xu 已提交
97
      smaWarn("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", TD_VID(pVnode), rsmaTriggerStat);
C
Cary Xu 已提交
98 99 100
      break;
    }
  }
C
Cary Xu 已提交
101 102
_exit:
  terrno = code;
C
Cary Xu 已提交
103
  return code;
C
Cary Xu 已提交
104 105
}

C
Cary Xu 已提交
106
int32_t smaFinishCommit(SSma *pSma) {
H
Hongze Cheng 已提交
107
  int32_t code = 0;
K
kailixu 已提交
108
  int32_t lino = 0;
H
Hongze Cheng 已提交
109
  SVnode *pVnode = pSma->pVnode;
C
Cary Xu 已提交
110

K
kailixu 已提交
111 112 113
  code = tdRSmaFSFinishCommit(pSma);
  TSDB_CHECK_CODE(code, lino, _exit);

C
Cary Xu 已提交
114
  if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
K
kailixu 已提交
115
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
116
  }
C
Cary Xu 已提交
117
  if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
K
kailixu 已提交
118
    TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
119 120
  }
_exit:
K
kailixu 已提交
121 122
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
C
Cary Xu 已提交
123
  }
C
Cary Xu 已提交
124
  return code;
C
Cary Xu 已提交
125
}
C
Cary Xu 已提交
126 127

/**
C
Cary Xu 已提交
128
 * @brief Rsma async commit implementation(only do some necessary light weighted task)
C
Cary Xu 已提交
129 130
 *  1) set rsma stat TASK_TRIGGER_STAT_PAUSED
 *  2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
C
Cary Xu 已提交
131 132
 *
 * @param pSma
K
kailixu 已提交
133
 * @param isCommit
C
Cary Xu 已提交
134 135
 * @return int32_t
 */
K
kailixu 已提交
136
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
K
kailixu 已提交
137 138
  int32_t code = 0;
  int32_t lino = 0;
139

C
Cary Xu 已提交
140 141
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
142
    return code;
C
Cary Xu 已提交
143 144
  }

C
Cary Xu 已提交
145
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
146
  SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
C
Cary Xu 已提交
147
  int32_t    nLoops = 0;
C
Cary Xu 已提交
148

C
Cary Xu 已提交
149
  // step 1: set rsma stat
C
Cary Xu 已提交
150
  atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
K
kailixu 已提交
151 152 153 154 155 156 157
  if (isCommit) {
    while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
      ++nLoops;
      if (nLoops > 1000) {
        sched_yield();
        nLoops = 0;
      }
158
    }
K
kailixu 已提交
159

K
kailixu 已提交
160 161 162 163 164 165
    pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
    if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1",
                pRSmaStat->commitAppliedVer)) {
      code = TSDB_CODE_APP_ERROR;
      TSDB_CHECK_CODE(code, lino, _exit);
    }
K
kailixu 已提交
166
  }
C
Cary Xu 已提交
167
  // step 2: wait for all triggered fetch tasks to finish
C
Cary Xu 已提交
168
  nLoops = 0;
C
Cary Xu 已提交
169
  while (1) {
K
kailixu 已提交
170
    if (atomic_load_32(&pRSmaStat->nFetchAll) <= 0) {
K
kailixu 已提交
171
      smaDebug("vgId:%d, rsma commit:%d, fetch tasks are all finished", SMA_VID(pSma), isCommit);
C
Cary Xu 已提交
172 173
      break;
    } else {
K
kailixu 已提交
174
      smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
C
Cary Xu 已提交
175 176 177 178 179 180 181 182
    }
    ++nLoops;
    if (nLoops > 1000) {
      sched_yield();
      nLoops = 0;
    }
  }

C
Cary Xu 已提交
183
  /**
C
Cary Xu 已提交
184
   * @brief step 3: commit should wait for all SubmitReq in buffer be consumed
C
Cary Xu 已提交
185 186 187
   *  1) This is high cost task and should not put in asyncPreCommit originally.
   *  2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
   */
K
kailixu 已提交
188
  smaInfo("vgId:%d, rsma commit:%d, wait for all items to be consumed, TID:%p", SMA_VID(pSma), isCommit,
C
Cary Xu 已提交
189
          (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
190 191 192 193
  nLoops = 0;
  while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
    ++nLoops;
    if (nLoops > 1000) {
C
Cary Xu 已提交
194 195 196 197
      sched_yield();
      nLoops = 0;
    }
  }
K
kailixu 已提交
198 199 200

  if (!isCommit) goto _exit;

C
Cary Xu 已提交
201
  smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
K
kailixu 已提交
202 203
  code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
  TSDB_CHECK_CODE(code, lino, _exit);
K
kailixu 已提交
204

205
  smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
C
Cary Xu 已提交
206

C
Cary Xu 已提交
207
#if 0  // consuming task of qTaskInfo clone 
C
Cary Xu 已提交
208
  // step 4:  swap queue/qall and iQueue/iQall
C
Cary Xu 已提交
209
  // lock
210
  taosWLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
211

C
Cary Xu 已提交
212
  void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
C
Cary Xu 已提交
213

C
Cary Xu 已提交
214 215 216 217 218 219 220
  while (pIter) {
    SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
    TSWAP(pInfo->iQall, pInfo->qall);
    TSWAP(pInfo->iQueue, pInfo->queue);
    TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
    TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
    pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
C
Cary Xu 已提交
221 222
  }

C
Cary Xu 已提交
223
  // unlock
224
  taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
225
#endif
C
Cary Xu 已提交
226

K
kailixu 已提交
227 228
  // all rsma results are written completely
  STsdb *pTsdb = NULL;
K
kailixu 已提交
229 230 231 232 233 234 235 236
  if ((pTsdb = VND_RSMA1(pSma->pVnode))) {
    code = tsdbPrepareCommit(pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
  if ((pTsdb = VND_RSMA2(pSma->pVnode))) {
    code = tsdbPrepareCommit(pTsdb);
    TSDB_CHECK_CODE(code, lino, _exit);
  }
K
kailixu 已提交
237

K
kailixu 已提交
238 239
_exit:
  if (code) {
K
kailixu 已提交
240
    smaError("vgId:%d, %s failed at line %d since %s(%d)", SMA_VID(pSma), __func__, lino, tstrerror(code), isCommit);
K
kailixu 已提交
241
  }
242
  return code;
C
Cary Xu 已提交
243 244 245 246 247 248 249 250
}

/**
 * @brief commit for rollup sma
 *
 * @param pSma
 * @return int32_t
 */
H
Hongze Cheng 已提交
251 252
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
  int32_t code = 0;
K
kailixu 已提交
253
  int32_t lino = 0;
H
Hongze Cheng 已提交
254
  SVnode *pVnode = pSma->pVnode;
C
Cary Xu 已提交
255

K
kailixu 已提交
256 257 258 259
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    goto _exit;
  }
K
kailixu 已提交
260

K
kailixu 已提交
261 262
  code = tdRSmaFSCommit(pSma);
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
263

K
kailixu 已提交
264 265 266 267 268
  code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
  TSDB_CHECK_CODE(code, lino, _exit);

  code = tsdbCommit(VND_RSMA2(pVnode), pInfo);
  TSDB_CHECK_CODE(code, lino, _exit);
C
Cary Xu 已提交
269

C
Cary Xu 已提交
270
_exit:
K
kailixu 已提交
271 272 273
  if (code) {
    smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
  }
C
Cary Xu 已提交
274
  return code;
C
Cary Xu 已提交
275 276 277
}

/**
C
Cary Xu 已提交
278
 * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
C
Cary Xu 已提交
279 280 281
 *
 * @param pSma
 * @return int32_t
C
Cary Xu 已提交
282 283
 */
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
C
Cary Xu 已提交
284 285
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
C
Cary Xu 已提交
286 287 288
    return TSDB_CODE_SUCCESS;
  }

289
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
290

C
Cary Xu 已提交
291
  // step 1: merge qTaskInfo and iQTaskInfo
C
Cary Xu 已提交
292
  // lock
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
  if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) {
    taosWLockLatch(SMA_ENV_LOCK(pEnv));

    void *pIter = NULL;
    while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) {
      tb_uid_t  *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
      SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
      if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
        int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
        if (refVal == 0) {
          taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid));
        } else {
          smaDebug(
              "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
              "table:%" PRIi64,
              SMA_VID(pSma), refVal, *pSuid);
        }

        continue;
C
Cary Xu 已提交
312
      }
C
Cary Xu 已提交
313
#if 0
C
Cary Xu 已提交
314 315 316
    if (pRSmaInfo->taskInfo[0]) {
      if (pRSmaInfo->iTaskInfo[0]) {
        SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
C
Cary Xu 已提交
317
        tdFreeRSmaInfo(pSma, pRSmaInfo, false);
C
Cary Xu 已提交
318
        pRSmaInfo->iTaskInfo[0] = NULL;
C
Cary Xu 已提交
319 320
      }
    } else {
C
Cary Xu 已提交
321
      TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
C
Cary Xu 已提交
322
    }
C
Cary Xu 已提交
323

C
Cary Xu 已提交
324 325
    taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
    smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
C
Cary Xu 已提交
326
#endif
327
    }
C
Cary Xu 已提交
328

329 330 331
    // unlock
    taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
  }
C
Cary Xu 已提交
332

C
Cary Xu 已提交
333 334
  atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

C
Cary Xu 已提交
335 336
  return TSDB_CODE_SUCCESS;
}