smaRollup.c 64.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "sma.h"

C
Cary Xu 已提交
18 19 20 21
#define RSMA_QTASKINFO_BUFSIZE     (32768)                                               // size
#define RSMA_QTASKINFO_HEAD_LEN    (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t))  // len + type + suid
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100)                                                 // cnt
#define RSMA_SUBMIT_BATCH_SIZE     (1024)                                                // cnt
C
Cary Xu 已提交
22
#define RSMA_FETCH_DELAY_MAX       (120000)                                              // ms
23
#define RSMA_FETCH_ACTIVE_MAX      (1000)                                                // ms
24
#define RSMA_FETCH_INTERVAL        (5000)                                                // ms
C
Cary Xu 已提交
25

26
SSmaMgmt smaMgmt = {
C
Cary Xu 已提交
27 28
    .inited = 0,
    .rsetId = -1,
29 30
};

C
Cary Xu 已提交
31 32
#define TD_QTASKINFO_FNAME_PREFIX "qinf.v"

33
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
C
Cary Xu 已提交
34
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
35

36
static int32_t    tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
37
static int32_t    tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
38
static int32_t    tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
L
Liu Jicong 已提交
39
                                          int8_t idx);
C
Cary Xu 已提交
40
static int32_t    tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
C
Cary Xu 已提交
41
                                    ERsmaExecType type, int8_t level);
C
Cary Xu 已提交
42 43
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void       tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
C
Cary Xu 已提交
44
static void       tdFreeRSmaSubmitItems(SArray *pItems);
45
static int32_t    tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
C
Cary Xu 已提交
46 47
static int32_t    tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
                                            int64_t suid);
C
Cary Xu 已提交
48
static void       tdRSmaFetchTrigger(void *param, void *tmrId);
49 50
static int32_t    tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo);
static void       tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
C
Cary Xu 已提交
51 52 53 54 55 56 57
static int32_t    tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
static int32_t    tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish);
static int32_t    tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter);
static int32_t    tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t    tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
static int32_t    tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer);
static int32_t    tdRSmaRestoreTSDataReload(SSma *pSma);
C
Cary Xu 已提交
58

59 60
static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) {
  // adapt accordingly if definition of SRSmaInfo update
61 62
  SRSmaInfo *pResult = NULL;
  ASSERT(pItem->level == TSDB_RETENTION_L1 || pItem->level == TSDB_RETENTION_L2);
C
Cary Xu 已提交
63
  pResult = (SRSmaInfo *)POINTER_SHIFT(pItem, -(sizeof(SRSmaInfoItem) * (pItem->level - 1) + RSMA_INFO_HEAD_LEN));
64 65
  ASSERT(pResult->pTSchema->numOfCols > 1);
  return pResult;
66
}
67

68 69 70 71 72 73 74
struct SRSmaQTaskInfoItem {
  int32_t len;
  int8_t  type;
  int64_t suid;
  void   *qTaskInfo;
};

C
Cary Xu 已提交
75
struct SRSmaQTaskInfoIter {
76 77 78 79 80
  STFile *pTFile;
  int64_t offset;
  int64_t fsize;
  int32_t nBytes;
  int32_t nAlloc;
C
Cary Xu 已提交
81
  char   *pBuf;
82
  // ------------
C
Cary Xu 已提交
83
  char   *qBuf;  // for iterator
84 85 86
  int32_t nBufPos;
};

C
Cary Xu 已提交
87
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
C
Cary Xu 已提交
88
  tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
89 90
}

91
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) {
C
Cary Xu 已提交
92 93 94
  tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
}

95 96 97 98 99 100 101 102 103 104 105 106
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) {
  tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
  int32_t rsmaLen = strlen(outputName);
  snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level);
}

void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
  tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
  int32_t rsmaLen = strlen(outputName);
  snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level);
}

C
Cary Xu 已提交
107
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
C
Cary Xu 已提交
108
  return lenWithHead - RSMA_QTASKINFO_HEAD_LEN;
C
Cary Xu 已提交
109 110
}

C
Cary Xu 已提交
111
static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); }
C
Cary Xu 已提交
112

113
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
114
  // Note: free/kill may in RC
C
Cary Xu 已提交
115
  if (!taskHandle || !(*taskHandle)) return;
116 117
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
C
Cary Xu 已提交
118
    smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
119
    qDestroyTask(otaskHandle);
C
Cary Xu 已提交
120
  } else {
C
Cary Xu 已提交
121
    smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
122
  }
C
Cary Xu 已提交
123
  // TODO: clear files related to qTaskInfo?
124 125
}

C
Cary Xu 已提交
126 127 128 129 130 131 132 133 134
/**
 * @brief general function to free rsmaInfo
 *
 * @param pSma
 * @param pInfo
 * @param isDeepFree Only stop tmrId and free pTSchema for deep free
 * @return void*
 */
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
135
  if (pInfo) {
C
Cary Xu 已提交
136
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
137
      SRSmaInfoItem *pItem = &pInfo->items[i];
C
Cary Xu 已提交
138

C
Cary Xu 已提交
139
      if (isDeepFree && pItem->tmrId) {
C
Cary Xu 已提交
140 141
        smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
                 pInfo->suid, i + 1);
C
Cary Xu 已提交
142 143 144
        taosTmrStopA(&pItem->tmrId);
      }

145 146 147 148
      if (isDeepFree && pItem->pStreamState) {
        streamStateClose(pItem->pStreamState);
      }

C
Cary Xu 已提交
149
      if (isDeepFree && pInfo->taskInfo[i]) {
C
Cary Xu 已提交
150
        tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
C
Cary Xu 已提交
151
      } else {
C
Cary Xu 已提交
152
        smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
153
                 pInfo->suid, i + 1);
154
      }
C
Cary Xu 已提交
155 156

      if (pInfo->iTaskInfo[i]) {
C
Cary Xu 已提交
157
        tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1);
C
Cary Xu 已提交
158 159 160 161
      } else {
        smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo",
                 SMA_VID(pSma), pInfo->suid, i + 1);
      }
162
    }
C
Cary Xu 已提交
163
    if (isDeepFree) {
C
Cary Xu 已提交
164
      taosMemoryFreeClear(pInfo->pTSchema);
C
Cary Xu 已提交
165
    }
C
Cary Xu 已提交
166 167 168 169

    if (isDeepFree) {
      if (pInfo->queue) taosCloseQueue(pInfo->queue);
      if (pInfo->qall) taosFreeQall(pInfo->qall);
C
Cary Xu 已提交
170 171
      if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue);
      if (pInfo->iQall) taosFreeQall(pInfo->iQall);
C
Cary Xu 已提交
172 173
      pInfo->queue = NULL;
      pInfo->qall = NULL;
C
Cary Xu 已提交
174 175
      pInfo->iQueue = NULL;
      pInfo->iQall = NULL;
C
Cary Xu 已提交
176 177
    }

C
Cary Xu 已提交
178 179
    taosMemoryFree(pInfo);
  }
180

181 182 183 184 185 186 187 188 189 190 191 192 193
  return NULL;
}

static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
  ASSERT(*pStore == NULL);
  *pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
  if (*pStore == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }
  return TSDB_CODE_SUCCESS;
}

194
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
195 196 197 198
  SRSmaInfo *pRSmaInfo = NULL;

  if (!suid || !tbUids) {
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
199
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr());
200 201 202
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
203 204 205 206 207
  if (!taosArrayGetSize(tbUids)) {
    smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid);
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
208
  pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid);
209 210

  if (!pRSmaInfo) {
S
Shengliang Guan 已提交
211
    smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid);
C
Cary Xu 已提交
212
    terrno = TSDB_CODE_RSMA_INVALID_STAT;
213 214 215
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
216 217
  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pRSmaInfo->taskInfo[i]) {
218
      if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) {
C
Cary Xu 已提交
219
        tdReleaseRSmaInfo(pSma, pRSmaInfo);
C
Cary Xu 已提交
220 221 222 223 224 225 226
        smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
                 terrstr());
        return TSDB_CODE_FAILED;
      } else {
        smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d",
                 SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i);
      }
C
Cary Xu 已提交
227
    }
228 229
  }

C
Cary Xu 已提交
230
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
231 232 233
  return TSDB_CODE_SUCCESS;
}

234
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
235 236 237 238
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
    return TSDB_CODE_SUCCESS;
  }

239
  if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) {
240 241 242 243 244 245 246 247
    return TSDB_CODE_FAILED;
  }

  void *pIter = taosHashIterate(pStore->uidHash, NULL);
  while (pIter) {
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
    SArray   *pTbUids = *(SArray **)pIter;

248
    if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) {
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
      taosHashCancelIterate(pStore->uidHash, pIter);
      return TSDB_CODE_FAILED;
    }

    pIter = taosHashIterate(pStore->uidHash, pIter);
  }
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief fetch suid/uids when create child tables of rollup SMA
 *
 * @param pTsdb
 * @param ppStore
 * @param suid
 * @param uid
 * @return int32_t
 */
int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);

  // only applicable to rollup SMA ctables
  if (!pEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
275 276 277
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
  SHashObj  *infoHash = NULL;
  if (!pStat || !(infoHash = RSMA_INFO_HASH(pStat))) {
C
Cary Xu 已提交
278
    terrno = TSDB_CODE_RSMA_INVALID_STAT;
279 280 281 282 283 284 285 286 287 288 289
    return TSDB_CODE_FAILED;
  }

  // info cached when create rsma stable and return directly for non-rsma ctables
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
    return TSDB_CODE_SUCCESS;
  }

  ASSERT(ppStore != NULL);

  if (!(*ppStore)) {
290
    if (tdUidStoreInit(ppStore) < 0) {
291 292 293 294
      return TSDB_CODE_FAILED;
    }
  }

295
  if (tdUidStorePut(*ppStore, suid, &uid) < 0) {
296 297 298 299 300 301 302
    *ppStore = tdUidStoreFree(*ppStore);
    return TSDB_CODE_FAILED;
  }

  return TSDB_CODE_SUCCESS;
}

303
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
L
Liu Jicong 已提交
304
                                       int8_t idx) {
C
Cary Xu 已提交
305 306 307
  if ((param->qmsgLen > 0) && param->qmsg[idx]) {
    SRetention *pRetention = SMA_RETENTION(pSma);
    STsdbCfg   *pTsdbCfg = SMA_TSDB_CFG(pSma);
C
Cary Xu 已提交
308
    SVnode     *pVnode = pSma->pVnode;
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    char        taskInfDir[TSDB_FILENAME_LEN] = {0};
    void       *pStreamState = NULL;

    // set the backend of stream state
    tdRSmaQTaskInfoGetFullPathEx(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
    if (!taosCheckExistFile(taskInfDir)) {
      char *s = strdup(taskInfDir);
      if (taosMulMkDir(taosDirName(s)) != 0) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        taosMemoryFree(s);
        return TSDB_CODE_FAILED;
      }
      taosMemoryFree(s);
    }
    pStreamState = streamStateOpen(taskInfDir, NULL, true);
    if (!pStreamState) {
      terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
      return TSDB_CODE_FAILED;
    }
    

C
Cary Xu 已提交
330
    SReadHandle handle = {
C
Cary Xu 已提交
331 332
        .meta = pVnode->pMeta,
        .vnode = pVnode,
C
Cary Xu 已提交
333
        .initTqReader = 1,
334
        .pStateBackend = pStreamState,
C
Cary Xu 已提交
335
    };
C
Cary Xu 已提交
336 337
    pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
    if (!pRSmaInfo->taskInfo[idx]) {
C
Cary Xu 已提交
338
      terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
C
Cary Xu 已提交
339
      return TSDB_CODE_FAILED;
C
Cary Xu 已提交
340
    }
C
Cary Xu 已提交
341
    SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
342
    pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;  // fetch the data when reboot
343
    pItem->pStreamState = pStreamState;
C
Cary Xu 已提交
344 345 346
    if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
      int64_t msInterval =
          convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
C
Cary Xu 已提交
347
      pItem->maxDelay = (int32_t)msInterval;
C
Cary Xu 已提交
348
    } else {
C
Cary Xu 已提交
349
      pItem->maxDelay = (int32_t)param->maxdelay[idx];
C
Cary Xu 已提交
350
    }
C
Cary Xu 已提交
351 352
    if (pItem->maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) {
      pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY;
C
Cary Xu 已提交
353
    }
C
Cary Xu 已提交
354

355
    pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2;
356 357 358 359 360
    ASSERT(pItem->level > 0);

    SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid};
    taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef));

361 362
    pItem->fetchLevel = pItem->level;
    taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
363 364

    smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64
365
            ", finally maxdelay:%" PRIi32,
366 367
            TD_VID(pVnode), pItem, pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx],
            pItem->maxDelay);
C
Cary Xu 已提交
368 369 370 371
  }
  return TSDB_CODE_SUCCESS;
}

372
/**
373
 * @brief for rsam create or restore
374
 *
375 376 377 378
 * @param pSma
 * @param param
 * @param suid
 * @param tbName
379 380
 * @return int32_t
 */
381
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
382
  if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
383
    smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
384 385 386
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
387
  if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
388 389 390 391 392
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
C
Cary Xu 已提交
393
  SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
394 395
  SRSmaInfo *pRSmaInfo = NULL;

396
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
397
  if (pRSmaInfo) {
398 399 400 401 402 403 404
    // TODO: free original pRSmaInfo if exists abnormally
    tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true);
    if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) {
      terrno = TSDB_CODE_RSMA_REMOVE_EXISTS;
      goto _err;
    }
    smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
405 406
  }

407
  // from write queue: single thead
408 409 410 411 412 413
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
  if (!pRSmaInfo) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }

414
  STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
415 416 417 418
  if (!pTSchema) {
    terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
    goto _err;
  }
C
Cary Xu 已提交
419
  pRSmaInfo->pSma = pSma;
420
  pRSmaInfo->pTSchema = pTSchema;
C
Cary Xu 已提交
421 422
  pRSmaInfo->suid = suid;
  T_REF_INIT_VAL(pRSmaInfo, 1);
C
Cary Xu 已提交
423 424 425
  if (!(pRSmaInfo->queue = taosOpenQueue())) {
    goto _err;
  }
C
Cary Xu 已提交
426

C
Cary Xu 已提交
427 428 429
  if (!(pRSmaInfo->qall = taosAllocateQall())) {
    goto _err;
  }
C
Cary Xu 已提交
430 431 432 433 434 435
  if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
    goto _err;
  }
  if (!(pRSmaInfo->iQall = taosAllocateQall())) {
    goto _err;
  }
436

L
Liu Jicong 已提交
437
  if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
C
Cary Xu 已提交
438 439
    goto _err;
  }
C
Cary Xu 已提交
440

L
Liu Jicong 已提交
441
  if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
C
Cary Xu 已提交
442 443
    goto _err;
  }
444

445
  if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
446
    goto _err;
447 448
  }

C
Cary Xu 已提交
449
  smaDebug("vgId:%d, register rsma info succeed for table %" PRIi64, SMA_VID(pSma), suid);
C
Cary Xu 已提交
450

451
  return TSDB_CODE_SUCCESS;
452
_err:
C
Cary Xu 已提交
453
  tdFreeRSmaInfo(pSma, pRSmaInfo, true);
454
  return TSDB_CODE_FAILED;
455 456
}

457
/**
C
Cary Xu 已提交
458
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently
459
 *
460
 * @param pSma
461 462 463
 * @param pReq
 * @return int32_t
 */
464 465
int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
  SVnode *pVnode = pSma->pVnode;
466
  if (!pReq->rollup) {
467 468 469 470 471 472 473 474
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since no rollup in req", TD_VID(pVnode), pReq->name,
             pReq->suid);
    return TSDB_CODE_SUCCESS;
  }

  if (!VND_IS_RSMA(pVnode)) {
    smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
             pReq->suid);
475 476 477
    return TSDB_CODE_SUCCESS;
  }

478
  return tdRSmaProcessCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
479 480
}

481 482 483 484 485 486 487
/**
 * @brief drop cache for stb
 *
 * @param pSma
 * @param pReq
 * @return int32_t
 */
L
Liu Jicong 已提交
488
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
489 490
  SVnode *pVnode = pSma->pVnode;
  if (!VND_IS_RSMA(pVnode)) {
C
Cary Xu 已提交
491
    smaTrace("vgId:%d, not drop rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
492 493 494 495
             pReq->suid);
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
496 497 498 499 500
  SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
  if (!pSmaEnv) {
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
501
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
C
Cary Xu 已提交
502

C
Cary Xu 已提交
503
  SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
C
Cary Xu 已提交
504 505 506 507 508 509 510 511

  if (!pRSmaInfo) {
    smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name,
            pReq->suid);
    return TSDB_CODE_SUCCESS;
  }

  // set del flag for data in mem
512
  atomic_store_8(&pRSmaStat->delFlag, 1);
C
Cary Xu 已提交
513 514 515
  RSMA_INFO_SET_DEL(pRSmaInfo);
  tdUnRefRSmaInfo(pSma, pRSmaInfo);

C
Cary Xu 已提交
516
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
C
Cary Xu 已提交
517

C
Cary Xu 已提交
518 519
  // save to file
  // TODO
520 521
  smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
522
}
523

524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
/**
 * @brief store suid/[uids], prefer to use array and then hash
 *
 * @param pStore
 * @param suid
 * @param uid
 * @return int32_t
 */
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
  // prefer to store suid/uids in array
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
    if (pStore->suid == 0) {
      pStore->suid = suid;
    }
    if (uid) {
      if (!pStore->tbUids) {
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
      }
      if (!taosArrayPush(pStore->tbUids, uid)) {
        return TSDB_CODE_FAILED;
      }
    }
  } else {
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
    if (!pStore->uidHash) {
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
      if (!pStore->uidHash) {
        return TSDB_CODE_FAILED;
      }
    }
    if (uid) {
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
        taosArrayPush(uidArray, uid);
      } else {
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
        if (!pUidArray) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
        if (!taosArrayPush(pUidArray, uid)) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
571
        if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) < 0) {
572 573 574 575
          return TSDB_CODE_FAILED;
        }
      }
    } else {
576
      if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) < 0) {
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
        return TSDB_CODE_FAILED;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

void tdUidStoreDestory(STbUidStore *pStore) {
  if (pStore) {
    if (pStore->uidHash) {
      if (pStore->tbUids) {
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
        void *pIter = taosHashIterate(pStore->uidHash, NULL);
        while (pIter) {
          SArray *arr = *(SArray **)pIter;
          taosArrayDestroy(arr);
          pIter = taosHashIterate(pStore->uidHash, pIter);
        }
      }
      taosHashCleanup(pStore->uidHash);
    }
    taosArrayDestroy(pStore->tbUids);
  }
}

void *tdUidStoreFree(STbUidStore *pStore) {
  if (pStore) {
    tdUidStoreDestory(pStore);
    taosMemoryFree(pStore);
  }
  return NULL;
}

C
Cary Xu 已提交
610 611 612 613 614 615 616 617
/**
 * @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
 * would be freed when close Vnode, thus lock should be used if with race condition.
 * @param pTsdb
 * @param version
 * @param pReq
 * @return int32_t
 */
618 619 620 621 622 623 624
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
  if (!pReq) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
C
Cary Xu 已提交
625
  // TODO: spin lock for race conditiond
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
  if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
    return TSDB_CODE_FAILED;
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SSubmitBlkIter blkIter = {0};
  STSRow        *row = NULL;

  terrno = TSDB_CODE_SUCCESS;

C
Cary Xu 已提交
641 642 643
  if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) {
    return -1;
  }
644
  while (true) {
C
Cary Xu 已提交
645 646 647
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) {
      return -1;
    }
648 649 650 651 652

    if (!pBlock) break;
    tdUidStorePut(pStore, msgIter.suid, NULL);
  }

C
Cary Xu 已提交
653 654 655
  if (terrno != TSDB_CODE_SUCCESS) {
    return -1;
  }
656 657 658
  return 0;
}

C
Cary Xu 已提交
659 660
/**
 * @brief retention of rsma1/rsma2
C
Cary Xu 已提交
661 662 663 664
 *
 * @param pSma
 * @param now
 * @return int32_t
C
Cary Xu 已提交
665 666 667
 */
int32_t smaDoRetention(SSma *pSma, int64_t now) {
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
668
  if (!VND_IS_RSMA(pSma->pVnode)) {
C
Cary Xu 已提交
669 670 671 672 673 674 675 676 677 678 679 680 681 682
    return code;
  }

  for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
    if (pSma->pRSmaTsdb[i]) {
      code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
      if (code) goto _end;
    }
  }

_end:
  return code;
}

C
Cary Xu 已提交
683 684
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
                                         int64_t suid) {
H
Haojun Liao 已提交
685 686 687 688 689
  SArray *pResList = taosArrayInit(1, POINTER_BYTES);
  if (pResList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }
690

H
Haojun Liao 已提交
691 692
  while (1) {
    uint64_t ts;
D
dapan1121 已提交
693
    int32_t  code = qExecTaskOpt(taskInfo, pResList, &ts, NULL);
C
Cary Xu 已提交
694
    if (code < 0) {
C
Cary Xu 已提交
695 696 697 698 699 700 701
      if (code == TSDB_CODE_QRY_IN_EXEC) {
        break;
      } else {
        smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
                 pItem->level, terrstr(code));
        goto _err;
      }
702
    }
C
Cary Xu 已提交
703

H
Haojun Liao 已提交
704 705
    if (taosArrayGetSize(pResList) == 0) {
      if (terrno == 0) {
706
        // smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
H
Haojun Liao 已提交
707
      } else {
708
        smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr());
H
Haojun Liao 已提交
709 710 711 712
        goto _err;
      }

      break;
C
Cary Xu 已提交
713
    } else {
714
      smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
H
Haojun Liao 已提交
715
    }
C
Cary Xu 已提交
716
#if 0
C
Cary Xu 已提交
717 718 719
    char flag[10] = {0};
    snprintf(flag, 10, "level %" PRIi8, pItem->level);
    blockDebugShowDataBlocks(pResList, flag);
C
Cary Xu 已提交
720
#endif
C
Cary Xu 已提交
721 722 723 724 725 726 727
    for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
      SSDataBlock *output = taosArrayGetP(pResList, i);
      STsdb       *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
      SSubmitReq  *pReq = NULL;

      // TODO: the schema update should be handled later(TD-17965)
      if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
728 729
        smaError("vgId:%d, build submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
                 suid, pItem->level, terrstr());
C
Cary Xu 已提交
730 731
        goto _err;
      }
732

C
Cary Xu 已提交
733 734
      if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
        taosMemoryFreeClear(pReq);
735
        smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
C
Cary Xu 已提交
736 737 738
                 SMA_VID(pSma), suid, pItem->level, terrstr());
        goto _err;
      }
C
Cary Xu 已提交
739

740 741 742 743
      smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
               SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen));

      taosMemoryFreeClear(pReq);
C
Cary Xu 已提交
744
    }
745 746
  }

H
Haojun Liao 已提交
747
  taosArrayDestroy(pResList);
C
Cary Xu 已提交
748
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
749

C
Cary Xu 已提交
750
_err:
H
Haojun Liao 已提交
751
  taosArrayDestroy(pResList);
C
Cary Xu 已提交
752
  return TSDB_CODE_FAILED;
753 754
}

C
Cary Xu 已提交
755
/**
C
Cary Xu 已提交
756
 * @brief Copy msg to rsmaQueueBuffer for batch process
C
Cary Xu 已提交
757 758 759 760 761 762 763 764 765 766 767 768
 *
 * @param pSma
 * @param pMsg
 * @param inputType
 * @param pInfo
 * @param suid
 * @return int32_t
 */
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
                                      tb_uid_t suid) {
  const SSubmitReq *pReq = (const SSubmitReq *)pMsg;

C
Cary Xu 已提交
769
  void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
C
Cary Xu 已提交
770 771 772 773 774 775 776 777
  if (!qItem) {
    return TSDB_CODE_FAILED;
  }

  memcpy(qItem, pMsg, pReq->header.contLen);

  taosWriteQitem(pInfo->queue, qItem);

C
Cary Xu 已提交
778 779
  pInfo->lastRecv = taosGetTimestampMs();

C
Cary Xu 已提交
780
  SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
C
Cary Xu 已提交
781 782

  int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
C
Cary Xu 已提交
783

784 785 786 787
  if (atomic_load_8(&pInfo->assigned) == 0) {
    tsem_post(&(pRSmaStat->notEmpty));
  }

C
Cary Xu 已提交
788
  // smoothing consume
C
Cary Xu 已提交
789
  int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE;
C
Cary Xu 已提交
790 791 792 793
  if (n > 1) {
    if (n > 10) {
      n = 10;
    }
C
Cary Xu 已提交
794 795
    taosMsleep(n << 3);
    if (n > 5) {
C
Cary Xu 已提交
796
      smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
C
Cary Xu 已提交
797
              taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3);
C
Cary Xu 已提交
798 799
    }
  }
C
Cary Xu 已提交
800

C
Cary Xu 已提交
801 802 803
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
804 805 806 807 808 809 810 811 812 813 814
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
  SSubmitMsgIter msgIter = {0};
  SSubmitBlkIter blkIter = {0};
  STSRow        *row = NULL;
  if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
  while (true) {
    SSubmitBlk *pBlock = NULL;
    if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
    if (pBlock == NULL) break;
    tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
    while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
C
Cary Xu 已提交
815
      smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
C
Cary Xu 已提交
816 817 818 819 820 821
               SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
    }
  }
  return 0;
}

C
Cary Xu 已提交
822 823 824 825 826 827 828 829
/**
 * @brief sync mode
 *
 * @param pSma
 * @param pMsg
 * @param msgSize
 * @param inputType
 * @param pInfo
C
Cary Xu 已提交
830
 * @param type
C
Cary Xu 已提交
831 832 833 834
 * @param level
 * @return int32_t
 */
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
C
Cary Xu 已提交
835
                                 ERsmaExecType type, int8_t level) {
C
Cary Xu 已提交
836
  int32_t idx = level - 1;
C
Cary Xu 已提交
837

C
Cary Xu 已提交
838
  void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
C
Cary Xu 已提交
839 840 841
  if (!qTaskInfo) {
    smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
             pInfo->suid);
842 843
    return TSDB_CODE_SUCCESS;
  }
C
Cary Xu 已提交
844
  if (!pInfo->pTSchema) {
C
Cary Xu 已提交
845
    smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
846 847
    return TSDB_CODE_FAILED;
  }
848 849

  smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
C
Cary Xu 已提交
850
           RSMA_INFO_QTASK(pInfo, idx), pInfo->suid);
851

C
Cary Xu 已提交
852 853 854
#if 0
  for (int32_t i = 0; i < msgSize; ++i) {
    SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
C
Cary Xu 已提交
855
    smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
C
Cary Xu 已提交
856 857 858 859
    tdRsmaPrintSubmitReq(pSma, pReq);
  }
#endif
  if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
860
    smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
861 862 863
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
864
  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
C
Cary Xu 已提交
865
  tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
866 867 868 869

  return TSDB_CODE_SUCCESS;
}

870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958
static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param,
                                tb_uid_t suid, int8_t idx) {
  SVnode *pVnode = pSma->pVnode;
  char   *pOutput = NULL;
  int32_t len = 0;

  if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) {
    smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid,
             terrstr());
    goto _err;
  }

  SReadHandle handle = {
      .meta = pVnode->pMeta,
      .vnode = pVnode,
      .initTqReader = 1,
  };
  ASSERT(!dstTaskInfo);
  dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
  if (!dstTaskInfo) {
    terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
    goto _err;
  }

  if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) {
    smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
             terrstr());
    goto _err;
  }

  smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid);

  taosMemoryFreeClear(pOutput);
  return TSDB_CODE_SUCCESS;
_err:
  taosMemoryFreeClear(pOutput);
  tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1);
  smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid,
           terrstr());
  return TSDB_CODE_FAILED;
}

/**
 * @brief Clone qTaskInfo of SRSmaInfo
 *
 * @param pSma
 * @param pInfo
 * @return int32_t
 */
static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
  SRSmaParam *param = NULL;
  if (!pInfo) {
    return TSDB_CODE_SUCCESS;
  }

  SMetaReader mr = {0};
  metaReaderInit(&mr, SMA_META(pSma), 0);
  smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
  if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
    smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
             terrstr());
    goto _err;
  }
  ASSERT(mr.me.type == TSDB_SUPER_TABLE);
  ASSERT(mr.me.uid == pInfo->suid);
  if (TABLE_IS_ROLLUP(mr.me.flags)) {
    param = &mr.me.stbEntry.rsmaParam;
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
      if (!pInfo->iTaskInfo[i]) {
        continue;
      }
      if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
        goto _err;
      }
    }
    smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
  } else {
    terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
    goto _err;
  }

  metaReaderClear(&mr);
  return TSDB_CODE_SUCCESS;
_err:
  metaReaderClear(&mr);
  smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
  return TSDB_CODE_FAILED;
}

C
Cary Xu 已提交
959 960
/**
 * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied.
C
Cary Xu 已提交
961 962 963 964
 *
 * @param pSma
 * @param suid
 * @return SRSmaInfo*
C
Cary Xu 已提交
965
 */
C
Cary Xu 已提交
966
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
967 968
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
  SRSmaStat *pStat = NULL;
C
Cary Xu 已提交
969 970
  SRSmaInfo *pRSmaInfo = NULL;

971
  if (!pEnv) {
C
Cary Xu 已提交
972
    terrno = TSDB_CODE_RSMA_INVALID_ENV;
973
    return NULL;
974 975
  }

976 977
  pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
  if (!pStat || !RSMA_INFO_HASH(pStat)) {
C
Cary Xu 已提交
978
    terrno = TSDB_CODE_RSMA_INVALID_STAT;
979 980
    return NULL;
  }
981

982
  taosRLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
983 984
  pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
  if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
C
Cary Xu 已提交
985
    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
986
      taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
987 988
      return NULL;
    }
C
Cary Xu 已提交
989
    if (!pRSmaInfo->taskInfo[0]) {
990
      if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) {
991
        taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
992 993 994
        return NULL;
      }
    }
C
Cary Xu 已提交
995
    tdRefRSmaInfo(pSma, pRSmaInfo);
996
    taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
997
    ASSERT(pRSmaInfo->suid == suid);
C
Cary Xu 已提交
998 999
    return pRSmaInfo;
  }
1000
  taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
C
Cary Xu 已提交
1001

C
Cary Xu 已提交
1002
  return NULL;
1003 1004
}

C
Cary Xu 已提交
1005 1006 1007
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
  if (pInfo) {
    tdUnRefRSmaInfo(pSma, pInfo);
C
Cary Xu 已提交
1008 1009 1010
  }
}

C
Cary Xu 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
/**
 * @brief async mode
 *
 * @param pSma
 * @param pMsg
 * @param inputType
 * @param suid
 * @return int32_t
 */
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
  SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
  if (!pRSmaInfo) {
    smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
    return TSDB_CODE_SUCCESS;
  }

  if (inputType == STREAM_INPUT__DATA_SUBMIT) {
    if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
    if (smaMgmt.tmrHandle) {
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
      if (pItem->level > 0) {
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
      }
      pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
      if (pItem->level > 0) {
        atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
      }
    }
C
Cary Xu 已提交
1042 1043
  } else {
    ASSERT(0);
C
Cary Xu 已提交
1044 1045 1046 1047 1048 1049
  }

  tdReleaseRSmaInfo(pSma, pRSmaInfo);
  return TSDB_CODE_SUCCESS;
}

1050 1051 1052 1053 1054 1055
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
  SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
  if (!pEnv) {
    // only applicable when rsma env exists
    return TSDB_CODE_SUCCESS;
  }
C
Cary Xu 已提交
1056
  STbUidStore uidStore = {0};
1057 1058 1059 1060 1061 1062
  SRetention *pRetention = SMA_RETENTION(pSma);
  if (!RETENTION_VALID(pRetention + 1)) {
    // return directly if retention level 1 is invalid
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
1063
  if (inputType == STREAM_INPUT__DATA_SUBMIT) {
C
Cary Xu 已提交
1064 1065 1066
    if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) {
      goto _err;
    }
1067 1068

    if (uidStore.suid != 0) {
C
Cary Xu 已提交
1069 1070 1071
      if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) {
        goto _err;
      }
1072

C
Cary Xu 已提交
1073 1074
      void *pIter = NULL;
      while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
1075
        tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
C
Cary Xu 已提交
1076 1077 1078
        if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) {
          goto _err;
        }
1079 1080 1081
      }
    }
  }
C
Cary Xu 已提交
1082
  tdUidStoreDestory(&uidStore);
1083
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1084 1085 1086 1087
_err:
  tdUidStoreDestory(&uidStore);
  smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr());
  return TSDB_CODE_FAILED;
1088
}
C
Cary Xu 已提交
1089

C
Cary Xu 已提交
1090 1091 1092 1093 1094 1095 1096
/**
 * @brief retrieve rsma meta and init
 *
 * @param pSma
 * @param nTables number of tables of rsma
 * @return int32_t
 */
C
Cary Xu 已提交
1097
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
C
Cary Xu 已提交
1098 1099 1100 1101
  SVnode     *pVnode = pSma->pVnode;
  SArray     *suidList = NULL;
  STbUidStore uidStore = {0};
  SMetaReader mr = {0};
1102

C
Cary Xu 已提交
1103 1104 1105 1106 1107 1108
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

  if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) {
C
Cary Xu 已提交
1109
    smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr());
C
Cary Xu 已提交
1110
    goto _err;
1111 1112
  }

C
Cary Xu 已提交
1113 1114
  int64_t arrSize = taosArrayGetSize(suidList);

C
Cary Xu 已提交
1115
  if (arrSize == 0) {
C
Cary Xu 已提交
1116 1117 1118
    if (nTables) {
      *nTables = 0;
    }
C
Cary Xu 已提交
1119 1120
    taosArrayDestroy(suidList);
    smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
1121 1122 1123
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1124
  int64_t nRsmaTables = 0;
1125
  metaReaderInit(&mr, SMA_META(pSma), 0);
C
Cary Xu 已提交
1126 1127 1128
  if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
    goto _err;
  }
C
Cary Xu 已提交
1129
  for (int64_t i = 0; i < arrSize; ++i) {
1130
    tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
C
Cary Xu 已提交
1131
    smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
1132
    if (metaGetTableEntryByUid(&mr, suid) < 0) {
C
Cary Xu 已提交
1133 1134
      smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid,
               terrstr());
1135 1136
      goto _err;
    }
1137
    tDecoderClear(&mr.coder);
1138 1139 1140
    ASSERT(mr.me.type == TSDB_SUPER_TABLE);
    ASSERT(mr.me.uid == suid);
    if (TABLE_IS_ROLLUP(mr.me.flags)) {
C
Cary Xu 已提交
1141
      ++nRsmaTables;
1142
      SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
C
Cary Xu 已提交
1143 1144 1145 1146
      for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
        smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
                 " qmsgLen:%" PRIi32,
                 TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]);
1147
      }
1148
      if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
C
Cary Xu 已提交
1149
        smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
1150 1151
        goto _err;
      }
C
Cary Xu 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160

      // reload all ctbUids for suid
      uidStore.suid = suid;
      if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
        smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
                 terrstr());
        goto _err;
      }

1161
      if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
C
Cary Xu 已提交
1162 1163 1164 1165 1166 1167 1168
        smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
                 terrstr());
        goto _err;
      }

      taosArrayClear(uidStore.tbUids);

C
Cary Xu 已提交
1169
      smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
1170 1171 1172
    }
  }

C
Cary Xu 已提交
1173 1174
  metaReaderClear(&mr);
  taosArrayDestroy(suidList);
C
Cary Xu 已提交
1175 1176 1177 1178 1179
  tdUidStoreDestory(&uidStore);

  if (nTables) {
    *nTables = nRsmaTables;
  }
C
Cary Xu 已提交
1180 1181 1182 1183 1184

  return TSDB_CODE_SUCCESS;
_err:
  metaReaderClear(&mr);
  taosArrayDestroy(suidList);
C
Cary Xu 已提交
1185
  tdUidStoreDestory(&uidStore);
C
Cary Xu 已提交
1186 1187 1188 1189

  return TSDB_CODE_FAILED;
}

1190
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer) {
C
Cary Xu 已提交
1191
  SVnode *pVnode = pSma->pVnode;
C
Cary Xu 已提交
1192
  STFile  tFile = {0};
C
Cary Xu 已提交
1193
  char    qTaskInfoFName[TSDB_FILENAME_LEN] = {0};
1194

1195
  tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), qTaskFileVer, qTaskInfoFName);
C
Cary Xu 已提交
1196
  if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
1197 1198
    goto _err;
  }
C
Cary Xu 已提交
1199 1200

  if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
1201 1202 1203
    if (qTaskFileVer > 0) {
      smaWarn("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", not start as %s not exist",
              TD_VID(pVnode), type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
C
Cary Xu 已提交
1204
    } else {
1205 1206
      smaDebug("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", no need as %s not exist", TD_VID(pVnode),
               type, qTaskFileVer, TD_TFILE_FULL_NAME(&tFile));
C
Cary Xu 已提交
1207
    }
C
Cary Xu 已提交
1208
    return TSDB_CODE_SUCCESS;
1209 1210
  }

1211 1212 1213
  if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
    goto _err;
  }
C
Cary Xu 已提交
1214

1215 1216 1217 1218 1219
  STFInfo tFileInfo = {0};
  if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) {
    goto _err;
  }

C
Cary Xu 已提交
1220
  SRSmaQTaskInfoIter fIter = {0};
1221
  if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
C
Cary Xu 已提交
1222 1223
    tdRSmaQTaskInfoIterDestroy(&fIter);
    tdCloseTFile(&tFile);
C
Cary Xu 已提交
1224
    tdDestroyTFile(&tFile);
1225 1226
    goto _err;
  }
C
Cary Xu 已提交
1227

1228
  if (tdRSmaQTaskInfoRestore(pSma, type, &fIter) < 0) {
C
Cary Xu 已提交
1229 1230
    tdRSmaQTaskInfoIterDestroy(&fIter);
    tdCloseTFile(&tFile);
C
Cary Xu 已提交
1231
    tdDestroyTFile(&tFile);
C
Cary Xu 已提交
1232 1233
    goto _err;
  }
1234

C
Cary Xu 已提交
1235
  tdRSmaQTaskInfoIterDestroy(&fIter);
C
Cary Xu 已提交
1236
  tdCloseTFile(&tFile);
C
Cary Xu 已提交
1237
  tdDestroyTFile(&tFile);
C
Cary Xu 已提交
1238

1239 1240 1241
  // restored successfully from committed or sync
  smaInfo("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload succeed", TD_VID(pVnode),
          type, qTaskFileVer);
C
Cary Xu 已提交
1242 1243
  return TSDB_CODE_SUCCESS;
_err:
1244 1245
  smaError("vgId:%d, restore rsma task %" PRIi8 " for version %" PRIi64 ", qtaskinfo reload failed since %s",
           TD_VID(pVnode), type, qTaskFileVer, terrstr());
C
Cary Xu 已提交
1246 1247 1248 1249 1250
  return TSDB_CODE_FAILED;
}

/**
 * @brief reload ts data from checkpoint
C
Cary Xu 已提交
1251 1252 1253
 *
 * @param pSma
 * @return int32_t
C
Cary Xu 已提交
1254
 */
1255
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
C
Cary Xu 已提交
1256
  // NOTHING TODO: the data would be restored from the unified WAL replay procedure
C
Cary Xu 已提交
1257 1258 1259
  return TSDB_CODE_SUCCESS;
}

1260
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) {
C
Cary Xu 已提交
1261
  // step 1: iterate all stables to restore the rsma env
C
Cary Xu 已提交
1262
  int64_t nTables = 0;
C
Cary Xu 已提交
1263
  if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) {
C
Cary Xu 已提交
1264 1265
    goto _err;
  }
C
Cary Xu 已提交
1266
  if (nTables <= 0) {
1267
    smaDebug("vgId:%d, no need to restore rsma task %" PRIi8 " since no tables", SMA_VID(pSma), type);
C
Cary Xu 已提交
1268 1269
    return TSDB_CODE_SUCCESS;
  }
C
Cary Xu 已提交
1270

1271
#if 0
C
Cary Xu 已提交
1272
  // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
1273
  if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
C
Cary Xu 已提交
1274 1275
    goto _err;
  }
1276
#endif
C
Cary Xu 已提交
1277 1278

  // step 3: reload ts data from checkpoint
1279
  if (tdRSmaRestoreTSDataReload(pSma) < 0) {
C
Cary Xu 已提交
1280 1281
    goto _err;
  }
1282 1283 1284 1285 1286 1287

  // step 4: open SRSmaFS for qTaskFiles
  if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) {
    goto _err;
  }

1288
  smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer);
1289 1290
  return TSDB_CODE_SUCCESS;
_err:
1291 1292
  smaError("vgId:%d, restore rsma task %" PRIi8 "from qtaskf %" PRIi64 " failed since %s", SMA_VID(pSma), type,
           qtaskFileVer, terrstr());
1293 1294 1295
  return TSDB_CODE_FAILED;
}

C
Cary Xu 已提交
1296 1297
/**
 * @brief Restore from SRSmaQTaskInfoItem
C
Cary Xu 已提交
1298 1299 1300 1301
 *
 * @param pSma
 * @param pItem
 * @return int32_t
C
Cary Xu 已提交
1302
 */
C
Cary Xu 已提交
1303
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) {
1304 1305 1306
  SRSmaInfo *pRSmaInfo = NULL;
  void      *qTaskInfo = NULL;

C
Cary Xu 已提交
1307
  pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid);
1308
  if (!pRSmaInfo) {
C
Cary Xu 已提交
1309
    smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
1310 1311 1312
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1313
  if (pItem->type == TSDB_RETENTION_L1) {
C
Cary Xu 已提交
1314
    qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0);
C
Cary Xu 已提交
1315
  } else if (pItem->type == TSDB_RETENTION_L2) {
C
Cary Xu 已提交
1316
    qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1);
1317 1318 1319 1320 1321
  } else {
    ASSERT(0);
  }

  if (!qTaskInfo) {
C
Cary Xu 已提交
1322
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
C
Cary Xu 已提交
1323
    smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid);
1324 1325 1326
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1327
  if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) {
C
Cary Xu 已提交
1328
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
C
Cary Xu 已提交
1329
    smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid,
C
Cary Xu 已提交
1330
             pItem->type, terrstr());
1331 1332
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1333 1334
  smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid,
           pItem->type);
1335

C
Cary Xu 已提交
1336
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
1337 1338 1339
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1340
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) {
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
  memset(pIter, 0, sizeof(*pIter));
  pIter->pTFile = pTFile;
  pIter->offset = TD_FILE_HEAD_SIZE;

  if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
    return TSDB_CODE_FAILED;
  }

  if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
    pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
  } else {
    pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
  }

  if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
    pIter->nAlloc = TD_FILE_HEAD_SIZE;
  }

C
Cary Xu 已提交
1359 1360
  pIter->pBuf = taosMemoryMalloc(pIter->nAlloc);
  if (!pIter->pBuf) {
1361 1362 1363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1364
  pIter->qBuf = pIter->pBuf;
1365 1366 1367 1368

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1369
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) {
1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
  STFile *pTFile = pIter->pTFile;
  int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;

  if (pIter->offset >= pIter->fsize) {
    *isFinish = true;
    return TSDB_CODE_SUCCESS;
  }

  if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
    nBytes = pIter->fsize - pIter->offset;
  }

  if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1386
  if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
1387 1388 1389 1390
    return TSDB_CODE_FAILED;
  }

  int32_t infoLen = 0;
C
Cary Xu 已提交
1391
  taosDecodeFixedI32(pIter->pBuf, &infoLen);
1392
  if (infoLen > nBytes) {
C
Cary Xu 已提交
1393 1394 1395 1396 1397
    if (infoLen <= RSMA_QTASKINFO_BUFSIZE) {
      terrno = TSDB_CODE_RSMA_FILE_CORRUPTED;
      smaError("iterate rsma qtaskinfo file %s failed since %s", TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
1398 1399 1400 1401 1402 1403 1404 1405
    if (pIter->nAlloc < infoLen) {
      pIter->nAlloc = infoLen;
      void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen);
      if (!pBuf) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return TSDB_CODE_FAILED;
      }
      pIter->pBuf = pBuf;
1406
    }
C
Cary Xu 已提交
1407

1408 1409
    nBytes = infoLen;

C
Cary Xu 已提交
1410
    if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
1411 1412 1413
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
1414
    if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) {
1415 1416 1417 1418
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
1419
  pIter->qBuf = pIter->pBuf;
1420 1421 1422 1423 1424 1425 1426
  pIter->offset += nBytes;
  pIter->nBytes = nBytes;
  pIter->nBufPos = 0;

  return TSDB_CODE_SUCCESS;
}

1427
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter) {
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
  while (1) {
    // block iter
    bool isFinish = false;
    if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
      return TSDB_CODE_FAILED;
    }
    if (isFinish) {
      return TSDB_CODE_SUCCESS;
    }

    // consume the block
    int32_t qTaskInfoLenWithHead = 0;
C
Cary Xu 已提交
1440 1441
    pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
    if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) {
1442
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
1443
      smaError("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s failed since %s", SMA_VID(pSma), type,
C
Cary Xu 已提交
1444
               TD_TFILE_FULL_NAME(pIter->pTFile), terrstr());
1445 1446
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
1447

1448 1449
    while (1) {
      if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
C
Cary Xu 已提交
1450 1451 1452 1453 1454
        SRSmaQTaskInfoItem infoItem = {0};
        pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type);
        pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid);
        infoItem.qTaskInfo = pIter->qBuf;
        infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
1455
        // do the restore job
1456 1457
        smaDebug("vgId:%d, restore rsma task %" PRIi8 " from qtaskinfo file %s offset:%" PRIi64 "\n", SMA_VID(pSma),
                 type, TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos);
C
Cary Xu 已提交
1458
        tdRSmaQTaskInfoItemRestore(pSma, &infoItem);
1459

C
Cary Xu 已提交
1460
        pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len);
1461 1462
        pIter->nBufPos += qTaskInfoLenWithHead;

C
Cary Xu 已提交
1463 1464 1465 1466 1467 1468 1469
        if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) {
          // prepare and load next block in the file
          pIter->offset -= (pIter->nBytes - pIter->nBufPos);
          break;
        }

        pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead);
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
        continue;
      }
      // prepare and load next block in the file
      pIter->offset -= (pIter->nBytes - pIter->nBufPos);
      break;
    }
  }

  return TSDB_CODE_SUCCESS;
}
C
Cary Xu 已提交
1480

1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
  SSma   *pSma = pRSmaStat->pSma;
  SVnode *pVnode = pSma->pVnode;
  int32_t vid = SMA_VID(pSma);

  if (taosHashGetSize(pInfoHash) <= 0) {
    return TSDB_CODE_SUCCESS;
  }

  int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
  if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
    smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
             pRSmaStat->commitAppliedVer, fsMaxVer);
    return TSDB_CODE_SUCCESS;
  }

  void *infoHash = NULL;
  while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
    SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;

    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
      continue;
    }

    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
      SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
      if (pItem && pItem->pStreamState) {
        if (streamStateCommit(pItem->pStreamState) < 0) {
          terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
          goto _err;
        }
        smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid,
                 i + 1);
      }
    }
  }

  return TSDB_CODE_SUCCESS;
_err:
  smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
  return TSDB_CODE_FAILED;
}

#if 0
C
Cary Xu 已提交
1525
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
C
Cary Xu 已提交
1526 1527 1528 1529 1530
  SSma   *pSma = pRSmaStat->pSma;
  SVnode *pVnode = pSma->pVnode;
  int32_t vid = SMA_VID(pSma);
  int64_t toffset = 0;
  bool    isFileCreated = false;
C
Cary Xu 已提交
1531

C
Cary Xu 已提交
1532
  if (taosHashGetSize(pInfoHash) <= 0) {
1533 1534 1535
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1536
  void *infoHash = taosHashIterate(pInfoHash, NULL);
C
Cary Xu 已提交
1537
  if (!infoHash) {
C
Cary Xu 已提交
1538
    return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1539 1540
  }

1541 1542 1543
  int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
  if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
    smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
1544
             pRSmaStat->commitAppliedVer, fsMaxVer);
1545 1546 1547
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1548
  STFile tFile = {0};
C
Cary Xu 已提交
1549 1550
#if 0
  if (pRSmaStat->commitAppliedVer > 0) {
1551
    char qTaskInfoFName[TSDB_FILENAME_LEN];
C
Cary Xu 已提交
1552
    tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
1553
    if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
1554
      smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
1555 1556 1557
      goto _err;
    }
    if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
1558
      smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
1559 1560 1561 1562 1563 1564
      goto _err;
    }
    smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));

    isFileCreated = true;
  }
C
Cary Xu 已提交
1565
#endif
1566

C
Cary Xu 已提交
1567 1568
  while (infoHash) {
    SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
C
Cary Xu 已提交
1569 1570 1571 1572 1573 1574

    if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
      infoHash = taosHashIterate(pInfoHash, infoHash);
      continue;
    }

C
Cary Xu 已提交
1575
    for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
C
Cary Xu 已提交
1576
#if 0
C
Cary Xu 已提交
1577
      qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
C
Cary Xu 已提交
1578 1579
#endif
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i);
C
Cary Xu 已提交
1580
      if (!taskInfo) {
1581
        smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
C
Cary Xu 已提交
1582 1583 1584 1585 1586 1587 1588
        continue;
      }

      char   *pOutput = NULL;
      int32_t len = 0;
      int8_t  type = (int8_t)(i + 1);
      if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
1589
        smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
C
Cary Xu 已提交
1590
                 i + 1, terrstr());
C
Cary Xu 已提交
1591 1592 1593
        goto _err;
      }
      if (!pOutput || len <= 0) {
1594 1595
        smaDebug("vgId:%d, rsma, table %" PRIi64
                 " level %d serialize qTaskInfo success but no output(len %d), not persist",
C
Cary Xu 已提交
1596 1597 1598 1599 1600
                 vid, pRSmaInfo->suid, i + 1, len);
        taosMemoryFreeClear(pOutput);
        continue;
      }

1601
      smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
C
Cary Xu 已提交
1602 1603 1604 1605
               pRSmaInfo->suid, i + 1, len);

      if (!isFileCreated) {
        char qTaskInfoFName[TSDB_FILENAME_LEN];
C
Cary Xu 已提交
1606
        tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
C
Cary Xu 已提交
1607
        if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
1608
          smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
C
Cary Xu 已提交
1609 1610
          goto _err;
        }
1611
        if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
1612
          smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
C
Cary Xu 已提交
1613 1614
          goto _err;
        }
1615 1616
        smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
                 TD_TFILE_FULL_NAME(&tFile));
C
Cary Xu 已提交
1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629

        isFileCreated = true;
      }

      char    tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0};
      void   *pTmpBuf = &tmpBuf;
      int32_t headLen = 0;
      headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN);
      headLen += taosEncodeFixedI8(&pTmpBuf, type);
      headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid);

      ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
      tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
1630
      smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
1631
               pRSmaInfo->suid, i + 1, headLen, toffset);
C
Cary Xu 已提交
1632
      tdAppendTFile(&tFile, pOutput, len, &toffset);
1633 1634
      smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
               pRSmaInfo->suid, i + 1, len, toffset);
C
Cary Xu 已提交
1635 1636 1637 1638

      taosMemoryFree(pOutput);
    }

C
Cary Xu 已提交
1639
    infoHash = taosHashIterate(pInfoHash, infoHash);
C
Cary Xu 已提交
1640
  }
C
Cary Xu 已提交
1641

C
Cary Xu 已提交
1642 1643
  if (isFileCreated) {
    if (tdUpdateTFileHeader(&tFile) < 0) {
1644
      smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
C
Cary Xu 已提交
1645 1646 1647
               tstrerror(terrno));
      goto _err;
    } else {
1648
      smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
C
Cary Xu 已提交
1649 1650 1651
    }

    tdCloseTFile(&tFile);
C
Cary Xu 已提交
1652
    tdDestroyTFile(&tFile);
C
Cary Xu 已提交
1653
  }
C
Cary Xu 已提交
1654
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1655
_err:
1656
  smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
C
Cary Xu 已提交
1657 1658
  if (isFileCreated) {
    tdRemoveTFile(&tFile);
C
Cary Xu 已提交
1659 1660 1661 1662 1663
    tdDestroyTFile(&tFile);
  }
  return TSDB_CODE_FAILED;
}

1664 1665
#endif

C
Cary Xu 已提交
1666
/**
C
Cary Xu 已提交
1667
 * @brief trigger to get rsma result in async mode
C
Cary Xu 已提交
1668 1669 1670 1671
 *
 * @param param
 * @param tmrId
 */
C
Cary Xu 已提交
1672
static void tdRSmaFetchTrigger(void *param, void *tmrId) {
1673
  SRSmaRef      *pRSmaRef = NULL;
C
Cary Xu 已提交
1674
  SSma          *pSma = NULL;
1675 1676 1677
  SRSmaStat     *pStat = NULL;
  SRSmaInfo     *pRSmaInfo = NULL;
  SRSmaInfoItem *pItem = NULL;
C
Cary Xu 已提交
1678

1679 1680 1681
  if (!(pRSmaRef = taosHashGet(smaMgmt.refHash, &param, POINTER_BYTES))) {
    smaDebug("rsma fetch task not start since rsma info item:%p not exist in refHash:%p, rsetId:%" PRIi64, param,
             *(int64_t *)&param, smaMgmt.refHash, smaMgmt.rsetId);
C
Cary Xu 已提交
1682 1683 1684
    return;
  }

1685
  if (!(pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaRef->refId))) {
C
Cary Xu 已提交
1686
    smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
C
Cary Xu 已提交
1687 1688
             smaMgmt.rsetId, pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
    taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
C
Cary Xu 已提交
1689
    return;
C
Cary Xu 已提交
1690 1691
  }

C
Cary Xu 已提交
1692 1693
  pSma = pStat->pSma;

1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
  if (!(pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaRef->suid))) {
    smaDebug("rsma fetch task not start since rsma info not exist, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
    tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
    taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
    return;
  }

  if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
    smaDebug("rsma fetch task not start since rsma info already deleted, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
             pRSmaRef->refId);  // pRSmaRef freed in taosHashRemove
    tdReleaseRSmaInfo(pSma, pRSmaInfo);
    tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
    taosHashRemove(smaMgmt.refHash, &param, POINTER_BYTES);
    return;
  }

  pItem = *(SRSmaInfoItem **)&param;

C
Cary Xu 已提交
1713 1714 1715 1716 1717
  // if rsma trigger stat in paused, cancelled or finished, not start fetch task
  int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
  switch (rsmaTriggerStat) {
    case TASK_TRIGGER_STAT_PAUSED:
    case TASK_TRIGGER_STAT_CANCELLED: {
C
Cary Xu 已提交
1718 1719
      smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
               ", rsetId rsetId:%" PRIi64 " refId:%d",
1720
               SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaRef->refId);
C
Cary Xu 已提交
1721
      if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
1722
        taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
C
Cary Xu 已提交
1723
      }
1724 1725
      tdReleaseRSmaInfo(pSma, pRSmaInfo);
      tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
C
Cary Xu 已提交
1726
      return;
C
Cary Xu 已提交
1727
    }
C
Cary Xu 已提交
1728 1729 1730 1731 1732 1733 1734 1735
    default:
      break;
  }

  int8_t fetchTriggerStat =
      atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
  switch (fetchTriggerStat) {
    case TASK_TRIGGER_STAT_ACTIVE: {
1736
      smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
C
Cary Xu 已提交
1737
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
C
Cary Xu 已提交
1738
      // async process
C
Cary Xu 已提交
1739
      pItem->fetchLevel = pItem->level;
C
Cary Xu 已提交
1740
#if 0
C
Cary Xu 已提交
1741 1742 1743 1744
      SRSmaInfo     *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
      SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
      ASSERT(qItem->level == pItem->level);
      ASSERT(qItem->fetchLevel == pItem->fetchLevel);
C
Cary Xu 已提交
1745
#endif
1746 1747 1748
      if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
        tsem_post(&(pStat->notEmpty));
      }
C
Cary Xu 已提交
1749 1750
    } break;
    case TASK_TRIGGER_STAT_PAUSED: {
C
Cary Xu 已提交
1751
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
C
Cary Xu 已提交
1752 1753 1754
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
    } break;
    case TASK_TRIGGER_STAT_INACTIVE: {
C
Cary Xu 已提交
1755
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive",
C
Cary Xu 已提交
1756 1757 1758
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
    } break;
    case TASK_TRIGGER_STAT_INIT: {
C
Cary Xu 已提交
1759 1760
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid::%" PRIi64 " since stat is init",
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
C
Cary Xu 已提交
1761 1762
    } break;
    default: {
C
Cary Xu 已提交
1763 1764
      smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown",
               SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
C
Cary Xu 已提交
1765
    } break;
C
Cary Xu 已提交
1766 1767 1768
  }

_end:
C
Cary Xu 已提交
1769
  taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
1770 1771
  tdReleaseRSmaInfo(pSma, pRSmaInfo);
  tdReleaseSmaRef(smaMgmt.rsetId, pRSmaRef->refId);
C
Cary Xu 已提交
1772
}
C
Cary Xu 已提交
1773

C
Cary Xu 已提交
1774 1775 1776 1777
static void tdFreeRSmaSubmitItems(SArray *pItems) {
  for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
    taosFreeQitem(*(void **)taosArrayGet(pItems, i));
  }
C
Cary Xu 已提交
1778
  taosArrayClear(pItems);
C
Cary Xu 已提交
1779 1780
}

C
Cary Xu 已提交
1781 1782 1783 1784 1785 1786 1787
/**
 * @brief fetch rsma result(consider the efficiency and functionality)
 *
 * @param pSma
 * @param pInfo
 * @return int32_t
 */
1788
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
C
Cary Xu 已提交
1789 1790
  SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
  for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
C
Cary Xu 已提交
1791 1792 1793
    SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
    if (pItem->fetchLevel) {
      pItem->fetchLevel = 0;
C
Cary Xu 已提交
1794 1795 1796 1797
      qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
      if (!taskInfo) {
        continue;
      }
C
Cary Xu 已提交
1798

C
Cary Xu 已提交
1799 1800 1801
      if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
        smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch executed",
                 SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
C
Cary Xu 已提交
1802
      } else {
C
Cary Xu 已提交
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812
        int64_t curMs = taosGetTimestampMs();
        if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) {
          smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
          atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);  // restore the active stat
          continue;
        } else {
          smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
                   SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
        }
C
Cary Xu 已提交
1813 1814
      }

C
Cary Xu 已提交
1815
      pItem->nScanned = 0;
C
Cary Xu 已提交
1816

C
Cary Xu 已提交
1817 1818 1819
      if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
        goto _err;
      }
C
Cary Xu 已提交
1820
      if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) {
C
Cary Xu 已提交
1821 1822 1823 1824 1825
        tdCleanupStreamInputDataBlock(taskInfo);
        goto _err;
      }

      tdCleanupStreamInputDataBlock(taskInfo);
C
Cary Xu 已提交
1826 1827
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished",
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
C
Cary Xu 已提交
1828
    } else {
C
Cary Xu 已提交
1829
      smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8
C
Cary Xu 已提交
1830
               " maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
C
Cary Xu 已提交
1831
               SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel);
C
Cary Xu 已提交
1832 1833 1834 1835 1836 1837 1838 1839 1840
    }
  }

_end:
  return TSDB_CODE_SUCCESS;
_err:
  return TSDB_CODE_FAILED;
}

C
Cary Xu 已提交
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879
static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) {
  taosArrayClear(pSubmitArr);
  while (1) {
    void *msg = NULL;
    taosGetQitem(qall, (void **)&msg);
    if (msg) {
      if (taosArrayPush(pSubmitArr, &msg) < 0) {
        tdFreeRSmaSubmitItems(pSubmitArr);
        goto _err;
      }
    } else {
      break;
    }
  }

  int32_t size = taosArrayGetSize(pSubmitArr);
  if (size > 0) {
    for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
      if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
        tdFreeRSmaSubmitItems(pSubmitArr);
        goto _err;
      }
    }
    tdFreeRSmaSubmitItems(pSubmitArr);
  }
  return TSDB_CODE_SUCCESS;
_err:
  while (1) {
    void *msg = NULL;
    taosGetQitem(qall, (void **)&msg);
    if (msg) {
      taosFreeQitem(msg);
    } else {
      break;
    }
  }
  return TSDB_CODE_FAILED;
}

C
Cary Xu 已提交
1880 1881 1882 1883
/**
 * @brief
 *
 * @param pSma
C
Cary Xu 已提交
1884
 * @param type
C
Cary Xu 已提交
1885 1886
 * @return int32_t
 */
C
Cary Xu 已提交
1887

C
Cary Xu 已提交
1888
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
C
Cary Xu 已提交
1889
  SVnode    *pVnode = pSma->pVnode;
C
Cary Xu 已提交
1890 1891 1892 1893
  SSmaEnv   *pEnv = SMA_RSMA_ENV(pSma);
  SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
  SHashObj  *infoHash = NULL;
  SArray    *pSubmitArr = NULL;
C
Cary Xu 已提交
1894
  bool       isFetchAll = false;
C
Cary Xu 已提交
1895 1896 1897 1898 1899 1900

  if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
    terrno = TSDB_CODE_RSMA_INVALID_STAT;
    goto _err;
  }

C
Cary Xu 已提交
1901 1902
  if (!(pSubmitArr =
            taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) {
C
Cary Xu 已提交
1903 1904 1905 1906
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

C
Cary Xu 已提交
1907 1908
  while (true) {
    // step 1: rsma exec - consume data in buffer queue for all suids
C
Cary Xu 已提交
1909
    if (type == RSMA_EXEC_OVERFLOW) {
1910 1911
      void *pIter = NULL;
      while ((pIter = taosHashIterate(infoHash, pIter))) {
C
Cary Xu 已提交
1912
        SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
1913 1914 1915 1916 1917 1918 1919 1920
        if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) {
          if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel ||
              RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
            int32_t batchCnt = -1;
            int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads;
            bool    occupied = (batchMax <= 1);
            if (batchMax > 1) {
              batchMax = 100 / batchMax;
C
Cary Xu 已提交
1921
              batchMax = TMAX(batchMax, 4);
C
Cary Xu 已提交
1922
            }
C
Cary Xu 已提交
1923
            while (occupied || (++batchCnt < batchMax)) {    // greedy mode
1924 1925 1926 1927 1928 1929 1930
              taosReadAllQitems(pInfo->queue, pInfo->qall);  // queue has mutex lock
              int32_t qallItemSize = taosQallItemSize(pInfo->qall);
              if (qallItemSize > 0) {
                tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
                smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
              }

1931 1932 1933 1934 1935 1936 1937 1938 1939
              if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
                int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2);
                if (oldStat == 0 ||
                    ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) {
                  atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1);
                  tdRSmaFetchAllResult(pSma, pInfo);
                  if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
                    atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
                  }
C
Cary Xu 已提交
1940
                }
1941 1942 1943 1944 1945 1946
              }

              if (qallItemSize > 0) {
                atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
                continue;
              } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956
                if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) {
                  continue;
                }
                for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) {
                  SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j);
                  if (pItem->fetchLevel) {
                    pItem->fetchLevel = 0;
                    taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
                  }
                }
1957 1958 1959
              }

              break;
C
Cary Xu 已提交
1960 1961
            }
          }
1962
          atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0);
C
Cary Xu 已提交
1963
        }
C
Cary Xu 已提交
1964
      }
C
Cary Xu 已提交
1965
    } else {
C
Cary Xu 已提交
1966
      ASSERT(0);
C
Cary Xu 已提交
1967 1968
    }

C
Cary Xu 已提交
1969
    if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) {
C
Cary Xu 已提交
1970
      if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
C
Cary Xu 已提交
1971 1972
        break;
      }
1973

1974
      tsem_wait(&pRSmaStat->notEmpty);
C
Cary Xu 已提交
1975

1976
      if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
1977
        smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag,
1978
                 atomic_load_64(&pRSmaStat->nBufItems));
1979 1980
        break;
      }
C
Cary Xu 已提交
1981
    }
1982

C
Cary Xu 已提交
1983
  }  // end of while(true)
C
Cary Xu 已提交
1984 1985

_end:
C
Cary Xu 已提交
1986 1987 1988 1989 1990 1991
  taosArrayDestroy(pSubmitArr);
  return TSDB_CODE_SUCCESS;
_err:
  taosArrayDestroy(pSubmitArr);
  return TSDB_CODE_FAILED;
}