tsdbSma.c 68.6 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tsdbSma.h"
#include "tsdb.h"
C
Cary Xu 已提交
18

C
Cary Xu 已提交
19 20 21 22 23
static const char *TSDB_SMA_DNAME[] = {
    "",      // TSDB_SMA_TYPE_BLOCK
    "tsma",  // TSDB_SMA_TYPE_TIME_RANGE
    "rsma",  // TSDB_SMA_TYPE_ROLLUP
};
C
Cary Xu 已提交
24

C
Cary Xu 已提交
25
#undef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
26
#define SMA_STORAGE_TSDB_DAYS   30
C
Cary Xu 已提交
27
#define SMA_STORAGE_TSDB_TIMES  10
C
Cary Xu 已提交
28
#define SMA_STORAGE_SPLIT_HOURS 24
C
Cary Xu 已提交
29
#define SMA_KEY_LEN             16  // TSKEY+groupId 8+8
C
Cary Xu 已提交
30
#define SMA_DROP_EXPIRED_TIME   10  // default is 10 seconds
C
Cary Xu 已提交
31

32 33 34 35
#define SMA_STATE_HASH_SLOT      4
#define SMA_STATE_ITEM_HASH_SLOT 32

#define SMA_TEST_INDEX_NAME "smaTestIndexName"  // TODO: just for test
36
#define SMA_TEST_INDEX_UID  2000000001          // TODO: just for test
C
Cary Xu 已提交
37 38

typedef struct SRSmaInfo SRSmaInfo;
C
Cary Xu 已提交
39
typedef enum {
C
Cary Xu 已提交
40 41
  SMA_STORAGE_LEVEL_TSDB = 0,     // use days of self-defined  e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
  SMA_STORAGE_LEVEL_DFILESET = 1  // use days of TS data       e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
C
Cary Xu 已提交
42 43
} ESmaStorageLevel;

C
Cary Xu 已提交
44 45 46 47 48 49 50 51
typedef struct SPoolMem {
  int64_t          size;
  struct SPoolMem *prev;
  struct SPoolMem *next;
} SPoolMem;

struct SSmaEnv {
  TdThreadRwlock lock;
C
Cary Xu 已提交
52
  int8_t         type;
C
Cary Xu 已提交
53 54 55 56 57 58 59 60 61
  TXN            txn;
  SPoolMem      *pPool;
  SDiskID        did;
  TENV          *dbEnv;  // TODO: If it's better to put it in smaIndex level?
  char          *path;   // relative path
  SSmaStat      *pStat;
};

#define SMA_ENV_LOCK(env)       ((env)->lock)
C
Cary Xu 已提交
62
#define SMA_ENV_TYPE(env)       ((env)->type)
C
Cary Xu 已提交
63 64 65 66 67 68
#define SMA_ENV_DID(env)        ((env)->did)
#define SMA_ENV_ENV(env)        ((env)->dbEnv)
#define SMA_ENV_PATH(env)       ((env)->path)
#define SMA_ENV_STAT(env)       ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)

C
Cary Xu 已提交
69
typedef struct {
C
Cary Xu 已提交
70 71 72 73
  STsdb        *pTsdb;
  SDBFile       dFile;
  const SArray *pDataBlocks;  // sma data
  int32_t       interval;     // interval with the precision of DB
C
Cary Xu 已提交
74 75 76 77
} STSmaWriteH;

typedef struct {
  int32_t iter;
C
Cary Xu 已提交
78
  int32_t fid;
C
Cary Xu 已提交
79
} SmaFsIter;
C
Cary Xu 已提交
80

C
Cary Xu 已提交
81
typedef struct {
C
Cary Xu 已提交
82
  STsdb    *pTsdb;
C
Cary Xu 已提交
83
  SDBFile   dFile;
C
Cary Xu 已提交
84 85 86 87 88 89 90
  int32_t   interval;   // interval with the precision of DB
  int32_t   blockSize;  // size of SMA block item
  int8_t    storageLevel;
  int8_t    days;
  SmaFsIter smaFsIter;
} STSmaReadH;

91 92 93 94 95
typedef struct {
  /**
   * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
   *    - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
   * Streaming Module or TSDB local persistence.
C
Cary Xu 已提交
96 97 98
   *    - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
   * without information about its previous state.
   *    - TSDB_SMA_STAT_DROPPED: 1)sma dropped
C
Cary Xu 已提交
99
   * N.B. only applicable to tsma
100 101 102
   */
  int8_t    state;           // ETsdbSmaStat
  SHashObj *expiredWindows;  // key: skey of time window, value: N/A
C
Cary Xu 已提交
103
  STSma    *pSma;            // cache schema
104 105
} SSmaStatItem;

C
Cary Xu 已提交
106 107 108 109 110 111
#define RSMA_MAX_LEVEL           2
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
  void *taskInfo[RSMA_MAX_LEVEL];  // qTaskInfo_t
};

112
struct SSmaStat {
C
Cary Xu 已提交
113 114 115 116
  union {
    SHashObj *smaStatItems;  // key: indexUid, value: SSmaStatItem for tsma
    SHashObj *rsmaInfoHash;  // key: stbUid, value: SRSmaInfo;
  };
117
  T_REF_DECLARE()
118
};
C
Cary Xu 已提交
119 120
#define SMA_STAT_ITEMS(s)     ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
C
Cary Xu 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
  // Note: free/kill may in RC
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
  }
}

static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
  for (int32_t i = 0; i < RSMA_MAX_LEVEL; ++i) {
    if (pInfo->taskInfo[i]) {
      tsdbFreeTaskHandle(pInfo->taskInfo[i]);
    }
  }
  return NULL;
}
138

C
Cary Xu 已提交
139
// declaration of static functions
C
Cary Xu 已提交
140

141
// expired window
142
static int32_t  tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
H
refact  
Hongze Cheng 已提交
143 144
static int32_t  tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
                                     int64_t version);
C
Cary Xu 已提交
145
static int32_t  tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
C
Cary Xu 已提交
146
static void    *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
C
Cary Xu 已提交
147
static int32_t  tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
C
Cary Xu 已提交
148 149
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did);
static int32_t  tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv);
150 151 152 153 154 155
static int32_t  tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t  tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
static int32_t  tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);

// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
C
Cary Xu 已提交
156
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
C
Cary Xu 已提交
157

158
// insert data
C
Cary Xu 已提交
159
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval,
C
Cary Xu 已提交
160
                                  int8_t intervalUnit);
161 162 163
static void    tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
C
Cary Xu 已提交
164
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid);
C
Cary Xu 已提交
165 166
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
                                    TXN *txn);
C
Cary Xu 已提交
167
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
C
Cary Xu 已提交
168
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
C
Cary Xu 已提交
169
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid);
170
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
C
Cary Xu 已提交
171
static bool    tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
C
Cary Xu 已提交
172
static void    tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
C
Cary Xu 已提交
173 174
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
C
Cary Xu 已提交
175

C
Cary Xu 已提交
176
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
C
Cary Xu 已提交
177
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
C
Cary Xu 已提交
178 179 180
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);

C
Cary Xu 已提交
181 182 183 184 185 186 187 188 189 190
// Pool Memory
static SPoolMem *openPool();
static void      clearPool(SPoolMem *pPool);
static void      closePool(SPoolMem *pPool);
static void     *poolMalloc(void *arg, size_t size);
static void      poolFree(void *arg, void *ptr);

static int tsdbSmaBeginCommit(SSmaEnv *pEnv);
static int tsdbSmaEndCommit(SSmaEnv *pEnv);

191
// implementation
C
Cary Xu 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) {
  return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}
static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) {
  return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}

static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockRdlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockWrlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockUnlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static SPoolMem *openPool() {
  SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool));

  pPool->prev = pPool->next = pPool;
  pPool->size = 0;

  return pPool;
}

static void clearPool(SPoolMem *pPool) {
  if (!pPool) return;
H
refact  
Hongze Cheng 已提交
237

C
Cary Xu 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
  SPoolMem *pMem;

  do {
    pMem = pPool->next;

    if (pMem == pPool) break;

    pMem->next->prev = pMem->prev;
    pMem->prev->next = pMem->next;
    pPool->size -= pMem->size;

    tdbOsFree(pMem);
  } while (1);

  assert(pPool->size == 0);
}

static void closePool(SPoolMem *pPool) {
  if (pPool) {
    clearPool(pPool);
    tdbOsFree(pPool);
  }
}

static void *poolMalloc(void *arg, size_t size) {
  void     *ptr = NULL;
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

  pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size);
C
Cary Xu 已提交
268
  if (!pMem) {
C
Cary Xu 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
    assert(0);
  }

  pMem->size = sizeof(*pMem) + size;
  pMem->next = pPool->next;
  pMem->prev = pPool;

  pPool->next->prev = pMem;
  pPool->next = pMem;
  pPool->size += pMem->size;

  ptr = (void *)(&pMem[1]);
  return ptr;
}

static void poolFree(void *arg, void *ptr) {
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

  pMem = &(((SPoolMem *)ptr)[-1]);

  pMem->next->prev = pMem->prev;
  pMem->prev->next = pMem->next;
  pPool->size -= pMem->size;

  tdbOsFree(pMem);
}
C
Cary Xu 已提交
296 297 298

int32_t tsdbInitSma(STsdb *pTsdb) {
  // tSma
H
Hongze Cheng 已提交
299
  int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(REPO_META(pTsdb), false));
C
Cary Xu 已提交
300 301 302 303 304 305 306
  if (numOfTSma > 0) {
    atomic_store_16(&REPO_TSMA_NUM(pTsdb), (int16_t)numOfTSma);
  }
  // TODO: rSma
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
307 308 309 310 311 312 313 314
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    return atomic_load_8(&pStatItem->state);
  }
  return TSDB_SMA_STAT_UNKNOWN;
}

static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
C
Cary Xu 已提交
315
  if (!pStatItem) {
C
Cary Xu 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    return false;
  }

  if (state) {
    *state = atomic_load_8(&pStatItem->state);
    return *state == TSDB_SMA_STAT_OK;
  }
  return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}

static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}

static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}

static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
  }
}

static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
  }
}

static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
  }
}

C
Cary Xu 已提交
352
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
C
Cary Xu 已提交
353
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]);
C
Cary Xu 已提交
354
}
C
Cary Xu 已提交
355

C
Cary Xu 已提交
356
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did) {
C
Cary Xu 已提交
357 358
  SSmaEnv *pEnv = NULL;

wafwerar's avatar
wafwerar 已提交
359
  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
C
Cary Xu 已提交
360
  if (!pEnv) {
C
Cary Xu 已提交
361 362 363 364
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

C
Cary Xu 已提交
365 366
  SMA_ENV_TYPE(pEnv) = smaType;

wafwerar's avatar
wafwerar 已提交
367
  int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
C
Cary Xu 已提交
368 369
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
wafwerar's avatar
wafwerar 已提交
370
    taosMemoryFree(pEnv);
C
Cary Xu 已提交
371 372 373 374
    return NULL;
  }

  ASSERT(path && (strlen(path) > 0));
C
Cary Xu 已提交
375
  SMA_ENV_PATH(pEnv) = strdup(path);
C
Cary Xu 已提交
376
  if (!SMA_ENV_PATH(pEnv)) {
C
Cary Xu 已提交
377 378 379 380
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
381
  SMA_ENV_DID(pEnv) = did;
382

C
Cary Xu 已提交
383
  if (tsdbInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
384 385 386 387
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

388
  char aname[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
389
  tfsAbsoluteName(REPO_TFS(pTsdb), did, path, aname);
C
Cary Xu 已提交
390 391 392 393 394
  if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
395
  if (!(pEnv->pPool = openPool())) {
396 397 398 399
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
400 401 402
  return pEnv;
}

C
Cary Xu 已提交
403
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) {
C
Cary Xu 已提交
404 405 406 407 408
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
409 410
  if (!(*pEnv)) {
    if (!(*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did))) {
C
Cary Xu 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
426
    tsdbDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
wafwerar's avatar
wafwerar 已提交
427 428
    taosMemoryFreeClear(pSmaEnv->pStat);
    taosMemoryFreeClear(pSmaEnv->path);
wafwerar's avatar
wafwerar 已提交
429
    taosThreadRwlockDestroy(&(pSmaEnv->lock));
C
Cary Xu 已提交
430 431
    tsdbCloseDBEnv(pSmaEnv->dbEnv);
    closePool(pSmaEnv->pPool);
C
Cary Xu 已提交
432 433 434 435 436
  }
}

void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
  tsdbDestroySmaEnv(pSmaEnv);
wafwerar's avatar
wafwerar 已提交
437
  taosMemoryFreeClear(pSmaEnv);
C
Cary Xu 已提交
438 439 440
  return NULL;
}

441
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
C
Cary Xu 已提交
442
  if (!pStat) return 0;
C
Cary Xu 已提交
443

444
  int ref = T_REF_INC(pStat);
C
Cary Xu 已提交
445
  tsdbDebug("vgId:%d ref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref);
446 447 448 449
  return 0;
}

static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
C
Cary Xu 已提交
450
  if (!pStat) return 0;
451 452

  int ref = T_REF_DEC(pStat);
C
Cary Xu 已提交
453
  tsdbDebug("vgId:%d unref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref);
454 455 456
  return 0;
}

C
Cary Xu 已提交
457
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
458
  ASSERT(pSmaStat != NULL);
C
Cary Xu 已提交
459

C
Cary Xu 已提交
460
  if (*pSmaStat) {  // no lock
C
Cary Xu 已提交
461 462 463
    return TSDB_CODE_SUCCESS;
  }

464 465 466 467 468
  /**
   *  1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tsdbNew).
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tsdbInitSmaStat invoked in other multithread environment later.
   */
C
Cary Xu 已提交
469
  if (!(*pSmaStat)) {
wafwerar's avatar
wafwerar 已提交
470
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
C
Cary Xu 已提交
471
    if (!(*pSmaStat)) {
472 473 474 475
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
476
    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
477
      SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
C
Cary Xu 已提交
478
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
479

C
Cary Xu 已提交
480
      if (!SMA_STAT_INFO_HASH(*pSmaStat)) {
C
Cary Xu 已提交
481 482 483 484
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
485
      SMA_STAT_ITEMS(*pSmaStat) =
C
Cary Xu 已提交
486 487
          taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

C
Cary Xu 已提交
488
      if (!SMA_STAT_ITEMS(*pSmaStat)) {
C
Cary Xu 已提交
489 490 491 492 493
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else {
      ASSERT(0);
494 495 496 497 498 499 500 501
    }
  }
  return TSDB_CODE_SUCCESS;
}

static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
  SSmaStatItem *pItem = NULL;

wafwerar's avatar
wafwerar 已提交
502
  pItem = (SSmaStatItem *)taosMemoryCalloc(1, sizeof(SSmaStatItem));
503 504 505 506 507
  if (pItem) {
    pItem->state = state;
    pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
                                         true, HASH_ENTRY_LOCK);
    if (!pItem->expiredWindows) {
wafwerar's avatar
wafwerar 已提交
508
      taosMemoryFreeClear(pItem);
509 510 511 512 513
    }
  }
  return pItem;
}

C
Cary Xu 已提交
514
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
C
Cary Xu 已提交
515
  if (pSmaStatItem) {
C
Cary Xu 已提交
516
    tdDestroyTSma(pSmaStatItem->pSma);
wafwerar's avatar
wafwerar 已提交
517
    taosMemoryFreeClear(pSmaStatItem->pSma);
C
Cary Xu 已提交
518
    taosHashCleanup(pSmaStatItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
519
    taosMemoryFreeClear(pSmaStatItem);
C
Cary Xu 已提交
520 521 522 523
  }
  return NULL;
}

C
Cary Xu 已提交
524 525
/**
 * @brief Release resources allocated for its member fields, not including itself.
526 527 528
 *
 * @param pSmaStat
 * @return int32_t
C
Cary Xu 已提交
529
 */
C
Cary Xu 已提交
530
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
531 532
  if (pSmaStat) {
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
C
Cary Xu 已提交
533
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
534
      void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL);
C
Cary Xu 已提交
535
      while (item) {
C
Cary Xu 已提交
536 537
        SSmaStatItem *pItem = *(SSmaStatItem **)item;
        tsdbFreeSmaStatItem(pItem);
C
Cary Xu 已提交
538
        item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item);
C
Cary Xu 已提交
539
      }
C
Cary Xu 已提交
540
      taosHashCleanup(SMA_STAT_ITEMS(pSmaStat));
C
Cary Xu 已提交
541
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
542
      void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL);
C
Cary Xu 已提交
543
      while (infoHash) {
C
Cary Xu 已提交
544 545
        SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
        tsdbFreeRSmaInfo(pInfoHash);
C
Cary Xu 已提交
546
        infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash);
C
Cary Xu 已提交
547
      }
C
Cary Xu 已提交
548
      taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat));
C
Cary Xu 已提交
549 550
    } else {
      ASSERT(0);
551 552
    }
  }
553
  return TSDB_CODE_SUCCESS;
554 555
}

C
Cary Xu 已提交
556
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
557 558
  SSmaEnv *pEnv = NULL;

C
Cary Xu 已提交
559
  // return if already init
C
Cary Xu 已提交
560 561
  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
C
Cary Xu 已提交
562
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)))) {
C
Cary Xu 已提交
563 564 565 566
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
C
Cary Xu 已提交
567
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)))) {
C
Cary Xu 已提交
568 569 570 571 572 573 574 575
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
576 577
  // init sma env
  tsdbLockRepo(pTsdb);
C
Cary Xu 已提交
578 579
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&REPO_TSMA_ENV(pTsdb))
                                               : atomic_load_ptr(&REPO_RSMA_ENV(pTsdb));
C
Cary Xu 已提交
580
  if (!pEnv) {
C
Cary Xu 已提交
581
    char rname[TSDB_FILENAME_LEN] = {0};
C
Cary Xu 已提交
582

C
Cary Xu 已提交
583
    SDiskID did = {0};
H
Hongze Cheng 已提交
584
    tfsAllocDisk(REPO_TFS(pTsdb), TFS_PRIMARY_LEVEL, &did);
C
Cary Xu 已提交
585 586 587 588 589 590
    if (did.level < 0 || did.id < 0) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }
    tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);

H
Hongze Cheng 已提交
591
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rname, did) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
592 593 594 595
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
596
    if (tsdbInitSmaEnv(pTsdb, smaType, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
597 598 599 600
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
601 602
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&REPO_TSMA_ENV(pTsdb), pEnv)
                                          : atomic_store_ptr(&REPO_RSMA_ENV(pTsdb), pEnv);
C
Cary Xu 已提交
603
  }
C
Cary Xu 已提交
604
  tsdbUnlockRepo(pTsdb);
C
Cary Xu 已提交
605 606 607 608

  return TSDB_CODE_SUCCESS;
};

H
refact  
Hongze Cheng 已提交
609 610
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
                                    int64_t version) {
611
  SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
612
  if (!pItem) {
C
Cary Xu 已提交
613 614
    // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
    pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK);  // TODO use the real state
C
Cary Xu 已提交
615
    if (!pItem) {
616
      // Response to stream computing: OOM
617
      // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
618 619 620
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
621
    // cache smaMeta
H
Hongze Cheng 已提交
622
    STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true);
C
Cary Xu 已提交
623
    if (!pSma) {
624
      terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
C
Cary Xu 已提交
625
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
626
      taosMemoryFree(pItem);
627 628
      tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
               tstrerror(terrno));
C
Cary Xu 已提交
629 630 631 632
      return TSDB_CODE_FAILED;
    }
    pItem->pSma = pSma;

C
Cary Xu 已提交
633
    if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
634 635
      // If error occurs during put smaStatItem, free the resources of pItem
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
636
      taosMemoryFree(pItem);
637 638
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
639
  } else if (!(pItem = *(SSmaStatItem **)pItem)) {
C
Cary Xu 已提交
640 641
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
642 643
  }

644
  if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
C
Cary Xu 已提交
645 646 647 648 649 650 651
    // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
    // tell query module to query raw TS data.
    // N.B.
    //  1) It is assumed to be extemely little probability event of fail to taosHashPut.
    //  2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
    // windows failed to put into hash table.
    taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
652
    taosMemoryFreeClear(pItem->pSma);
C
Cary Xu 已提交
653
    taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
654 655
    tsdbWarn("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", REPO_ID(pTsdb), indexUid,
             winSKey);
C
Cary Xu 已提交
656 657
    return TSDB_CODE_FAILED;
  }
wmmhello's avatar
wmmhello 已提交
658

C
Cary Xu 已提交
659 660
  tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", REPO_ID(pTsdb), indexUid,
            winSKey);
661
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
662 663 664 665 666 667 668 669 670
}

/**
 * @brief Update expired window according to msg from stream computing module.
 *
 * @param pTsdb
 * @param msg SSubmitReq
 * @return int32_t
 */
671 672
int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
  // no time-range-sma, just return success
C
Cary Xu 已提交
673 674 675 676 677
  if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) {
    tsdbTrace("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb));
    return TSDB_CODE_SUCCESS;
  }

H
Hongze Cheng 已提交
678
  if (!REPO_META(pTsdb)) {
C
Cary Xu 已提交
679 680 681 682 683 684 685 686 687 688 689 690
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

  // Firstly, assume that tSma can only be created on super table/normal table.
  // getActiveTimeWindow

C
Cary Xu 已提交
691
  SSmaEnv  *pEnv = REPO_TSMA_ENV(pTsdb);
C
Cary Xu 已提交
692 693 694
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);

C
Cary Xu 已提交
695
  TASSERT(pEnv && pStat && pItemsHash);
C
Cary Xu 已提交
696

C
Cary Xu 已提交
697 698 699
  // basic procedure
  // TODO: optimization
  tsdbRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
700 701 702 703

  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SInterval      interval = {0};
C
Cary Xu 已提交
704
  TSKEY          lastWinSKey = INT64_MIN;
C
Cary Xu 已提交
705

C
Cary Xu 已提交
706
  if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
707 708 709 710
    return TSDB_CODE_FAILED;
  }

  while (true) {
C
Cary Xu 已提交
711
    tGetSubmitMsgNextEx(&msgIter, &pBlock);
C
Cary Xu 已提交
712
    if (!pBlock) break;
C
Cary Xu 已提交
713 714 715 716

    STSmaWrapper *pSW = NULL;
    STSma        *pTSma = NULL;

C
Cary Xu 已提交
717
    SSubmitBlkIter blkIter = {0};
C
Cary Xu 已提交
718
    if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
719
      pSW = tdFreeTSmaWrapper(pSW);
C
Cary Xu 已提交
720 721 722
      break;
    }

C
Cary Xu 已提交
723
    while (true) {
C
Cary Xu 已提交
724
      STSRow *row = tGetSubmitBlkNextEx(&blkIter);
C
Cary Xu 已提交
725
      if (!row) {
C
Cary Xu 已提交
726 727 728
        tdFreeTSmaWrapper(pSW);
        break;
      }
C
Cary Xu 已提交
729 730 731 732
      if (!pSW || (pTSma->tableUid != pBlock->suid)) {
        if (pSW) {
          pSW = tdFreeTSmaWrapper(pSW);
        }
C
Cary Xu 已提交
733
        if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) {
C
Cary Xu 已提交
734 735
          break;
        }
C
Cary Xu 已提交
736
        if ((pSW->number) <= 0 || !pSW->tSma) {
C
Cary Xu 已提交
737
          pSW = tdFreeTSmaWrapper(pSW);
C
Cary Xu 已提交
738 739
          break;
        }
C
Cary Xu 已提交
740

C
Cary Xu 已提交
741 742
        pTSma = pSW->tSma;

C
Cary Xu 已提交
743 744 745 746 747 748 749
        interval.interval = pTSma->interval;
        interval.intervalUnit = pTSma->intervalUnit;
        interval.offset = pTSma->offset;
        interval.precision = REPO_CFG(pTsdb)->precision;
        interval.sliding = pTSma->sliding;
        interval.slidingUnit = pTSma->slidingUnit;
      }
C
Cary Xu 已提交
750

C
update  
Cary Xu 已提交
751 752
      TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);

C
Cary Xu 已提交
753 754 755 756 757 758 759
      if (lastWinSKey != winSKey) {
        lastWinSKey = winSKey;
        tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version);
      } else {
        tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
                  REPO_ID(pTsdb), pTSma->indexUid, winSKey);
      }
760 761 762
    }
  }

763
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
764

765 766 767
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
768 769 770 771 772 773 774 775 776
/**
 * @brief When sma data received from stream computing, make the relative expired window valid.
 *
 * @param pTsdb
 * @param pStat
 * @param indexUid
 * @param skey
 * @return int32_t
 */
777
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
C
Cary Xu 已提交
778 779
  SSmaStatItem *pItem = NULL;

780 781
  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
782 783
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
784
  }
C
Cary Xu 已提交
785
  if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) {
786 787
    // pItem resides in hash buffer all the time unless drop sma index
    // TODO: multithread protect
C
Cary Xu 已提交
788 789
    if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
      // error handling
790
      tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
791
      tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", REPO_ID(pTsdb),
792 793
               skey, indexUid);
      return TSDB_CODE_FAILED;
C
Cary Xu 已提交
794
    }
C
Cary Xu 已提交
795 796
    tsdbDebug("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", REPO_ID(pTsdb),
              skey, indexUid);
C
Cary Xu 已提交
797 798 799 800 801 802 803 804 805
    // TODO: use a standalone interface to received state upate notification from stream computing module.
    /**
     * @brief state
     *  - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
     *  - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
     * when batch data caculation not finised)
     *  - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
     */
    pItem->state = TSDB_SMA_STAT_OK;
C
Cary Xu 已提交
806 807
  } else {
    // error handling
808 809 810
    tsdbUnRefSmaStat(pTsdb, pStat);
    tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid);
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
811
  }
812 813

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
814 815 816
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
817 818 819 820 821 822 823
/**
 * @brief Judge the tSma storage level
 *
 * @param interval
 * @param intervalUnit
 * @return int32_t
 */
C
Cary Xu 已提交
824
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
825 826
  // TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
  switch (intervalUnit) {
C
Cary Xu 已提交
827
    case TIME_UNIT_HOUR:
C
Cary Xu 已提交
828 829 830 831
      if (interval < SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
832
    case TIME_UNIT_MINUTE:
C
Cary Xu 已提交
833 834 835 836
      if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
837
    case TIME_UNIT_SECOND:
C
Cary Xu 已提交
838 839 840 841
      if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
842
    case TIME_UNIT_MILLISECOND:
C
Cary Xu 已提交
843 844 845 846
      if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
847
    case TIME_UNIT_MICROSECOND:
C
Cary Xu 已提交
848 849 850 851
      if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
852
    case TIME_UNIT_NANOSECOND:
C
Cary Xu 已提交
853 854 855 856 857 858 859 860 861 862 863
      if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
    default:
      break;
  }
  return SMA_STORAGE_LEVEL_TSDB;
}

/**
C
Cary Xu 已提交
864
 * @brief Insert TSma data blocks to DB File build by B+Tree
C
Cary Xu 已提交
865
 *
C
Cary Xu 已提交
866
 * @param pSmaH
867
 * @param smaKey  tableUid-colId-skeyOfWindow(8-2-8)
C
Cary Xu 已提交
868
 * @param keyLen
C
Cary Xu 已提交
869 870 871 872
 * @param pData
 * @param dataLen
 * @return int32_t
 */
C
Cary Xu 已提交
873 874
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
                                    TXN *txn) {
C
Cary Xu 已提交
875
  SDBFile *pDBFile = &pSmaH->dFile;
C
Cary Xu 已提交
876

877
  // TODO: insert sma data blocks into B+Tree(TDB)
C
Cary Xu 已提交
878
  if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
C
Cary Xu 已提交
879 880
    tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
             REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
881 882
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
883 884
  tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
            REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
885

C
Cary Xu 已提交
886
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
887
  uint32_t valueSize = 0;
C
Cary Xu 已提交
888
  void    *data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
C
Cary Xu 已提交
889 890
  ASSERT(data != NULL);
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
891
    tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
C
Cary Xu 已提交
892 893
  }
#endif
C
Cary Xu 已提交
894 895 896
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
897 898 899 900 901 902
/**
 * @brief Approximate value for week/month/year.
 *
 * @param interval
 * @param intervalUnit
 * @param precision
C
Cary Xu 已提交
903
 * @param adjusted Interval already adjusted according to DB precision
C
Cary Xu 已提交
904 905
 * @return int64_t
 */
C
Cary Xu 已提交
906 907 908 909 910
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted) {
  if (adjusted) {
    return interval;
  }

C
Cary Xu 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
  switch (intervalUnit) {
    case TIME_UNIT_YEAR:  // approximate value
      interval *= 365 * 86400 * 1e3;
      break;
    case TIME_UNIT_MONTH:  // approximate value
      interval *= 30 * 86400 * 1e3;
      break;
    case TIME_UNIT_WEEK:  // approximate value
      interval *= 7 * 86400 * 1e3;
      break;
    case TIME_UNIT_DAY:  // the interval for tSma calculation must <= day
      interval *= 86400 * 1e3;
      break;
    case TIME_UNIT_HOUR:
      interval *= 3600 * 1e3;
      break;
    case TIME_UNIT_MINUTE:
      interval *= 60 * 1e3;
      break;
    case TIME_UNIT_SECOND:
      interval *= 1e3;
      break;
    default:
      break;
C
Cary Xu 已提交
935 936
  }

C
Cary Xu 已提交
937 938
  switch (precision) {
    case TSDB_TIME_PRECISION_MILLI:
C
Cary Xu 已提交
939
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
940
        return interval / 1e3;
C
Cary Xu 已提交
941
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
942
        return interval / 1e6;
943
      } else {  // ms
C
Cary Xu 已提交
944 945 946
        return interval;
      }
      break;
C
Cary Xu 已提交
947
    case TSDB_TIME_PRECISION_MICRO:
C
Cary Xu 已提交
948
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
949
        return interval;
950
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
951
        return interval / 1e3;
952
      } else {  // ms
C
Cary Xu 已提交
953 954 955
        return interval * 1e3;
      }
      break;
C
Cary Xu 已提交
956
    case TSDB_TIME_PRECISION_NANO:
957
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
958
        return interval * 1e3;
959
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  // ns
C
Cary Xu 已提交
960
        return interval;
961
      } else {  // ms
C
Cary Xu 已提交
962
        return interval * 1e6;
C
Cary Xu 已提交
963 964
      }
      break;
C
Cary Xu 已提交
965
    default:                                        // ms
C
Cary Xu 已提交
966
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
967
        return interval / 1e3;
968
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
969
        return interval / 1e6;
970
      } else {  // ms
C
Cary Xu 已提交
971
        return interval;
C
Cary Xu 已提交
972 973 974 975 976 977
      }
      break;
  }
  return interval;
}

C
Cary Xu 已提交
978 979
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval,
                                  int8_t intervalUnit) {
C
Cary Xu 已提交
980
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
981 982 983
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
  pSmaH->pDataBlocks = pDataBlocks;
  pSmaH->dFile.fid = TSDB_IVLD_FID;
984 985 986 987 988
  return TSDB_CODE_SUCCESS;
}

static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
  if (pSmaH) {
C
Cary Xu 已提交
989
    tsdbCloseDBF(&pSmaH->dFile);
990
  }
C
Cary Xu 已提交
991 992
}

C
Cary Xu 已提交
993
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) {
C
Cary Xu 已提交
994
  STsdb *pTsdb = pSmaH->pTsdb;
C
Cary Xu 已提交
995
  ASSERT(!pSmaH->dFile.path && !pSmaH->dFile.pDB);
996 997

  pSmaH->dFile.fid = fid;
998
  char tSmaFile[TSDB_FILENAME_LEN] = {0};
999
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1000
  pSmaH->dFile.path = strdup(tSmaFile);
1001

C
Cary Xu 已提交
1002
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1003
}
C
Cary Xu 已提交
1004

C
Cary Xu 已提交
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
/**
 * @brief
 *
 * @param pTsdb
 * @param interval Interval calculated by DB's precision
 * @param storageLevel
 * @return int32_t
 */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
  STsdbCfg *pCfg = REPO_CFG(pTsdb);
H
refact  
Hongze Cheng 已提交
1015
  int32_t   daysPerFile = pCfg->days;
C
Cary Xu 已提交
1016 1017

  if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
C
Cary Xu 已提交
1018
    int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
C
Cary Xu 已提交
1019 1020 1021 1022 1023
    daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
  }

  return daysPerFile;
}
C
Cary Xu 已提交
1024

C
Cary Xu 已提交
1025 1026 1027 1028 1029
static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
  TXN *pTxn = &pEnv->txn;
  // start a new txn
  tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
  if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
1030
    tsdbWarn("tsdbSma tdb begin commit fail");
C
Cary Xu 已提交
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    return -1;
  }
  return 0;
}

static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
  TXN *pTxn = &pEnv->txn;

  // Commit current txn
  if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
1041
    tsdbWarn("tsdbSma tdb end commit fail");
C
Cary Xu 已提交
1042 1043 1044 1045 1046 1047 1048
    return -1;
  }
  tdbTxnClose(pTxn);
  clearPool(pEnv->pPool);
  return 0;
}

C
Cary Xu 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057
/**
 * @brief Insert/Update Time-range-wise SMA data.
 *  - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
 * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
 *  - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
 * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
 *  - The destination file of one data block for some interval is determined by its start TS key.
 *
 * @param pTsdb
C
Cary Xu 已提交
1058
 * @param msg
C
Cary Xu 已提交
1059 1060
 * @return int32_t
 */
C
Cary Xu 已提交
1061 1062 1063
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) {
  STsdbCfg     *pCfg = REPO_CFG(pTsdb);
  const SArray *pDataBlocks = (const SArray *)msg;
C
Cary Xu 已提交
1064

C
Cary Xu 已提交
1065 1066
  // TODO: destroy SSDataBlocks(msg)

1067 1068 1069 1070 1071
  // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
  // the sma data would arrive ahead of the update-expired-window msg.
  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
1072
  }
C
Cary Xu 已提交
1073

C
Cary Xu 已提交
1074
  if (!pDataBlocks) {
C
Cary Xu 已提交
1075
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1076
    tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb));
C
Cary Xu 已提交
1077
    return terrno;
C
Cary Xu 已提交
1078 1079
  }

C
Cary Xu 已提交
1080
  if (taosArrayGetSize(pDataBlocks) <= 0) {
C
Cary Xu 已提交
1081
    terrno = TSDB_CODE_INVALID_PARA;
C
Cary Xu 已提交
1082
    tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", REPO_ID(pTsdb));
1083
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1084
  }
C
Cary Xu 已提交
1085

1086
  SSmaEnv      *pEnv = REPO_TSMA_ENV(pTsdb);
C
Cary Xu 已提交
1087
  SSmaStat     *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
1088 1089 1090 1091
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
1092 1093
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1094
  }
C
Cary Xu 已提交
1095

C
Cary Xu 已提交
1096
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) {
C
Cary Xu 已提交
1097 1098 1099 1100 1101
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1102
  STSma      *pSma = pItem->pSma;
C
Cary Xu 已提交
1103 1104
  STSmaWriteH tSmaH = {0};

C
Cary Xu 已提交
1105
  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) {
C
Cary Xu 已提交
1106 1107 1108
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1109 1110
  char rPath[TSDB_FILENAME_LEN] = {0};
  char aPath[TSDB_FILENAME_LEN] = {0};
1111 1112 1113 1114
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1115
      tsdbUnRefSmaStat(pTsdb, pStat);
1116 1117 1118 1119
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
1120
  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
1121
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
1122
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
1123

C
Cary Xu 已提交
1124 1125
  char    smaKey[SMA_KEY_LEN] = {0};  // key: skey + groupId
  char    dataBuf[512] = {0};         // val: aggr data // TODO: handle 512 buffer?
C
Cary Xu 已提交
1126
  void   *pDataBuf = NULL;
C
Cary Xu 已提交
1127 1128
  int32_t sz = taosArrayGetSize(pDataBlocks);
  for (int32_t i = 0; i < sz; ++i) {
C
Cary Xu 已提交
1129
    SSDataBlock *pDataBlock = taosArrayGet(pDataBlocks, i);
C
Cary Xu 已提交
1130 1131 1132 1133 1134 1135
    int32_t      colNum = pDataBlock->info.numOfCols;
    int32_t      rows = pDataBlock->info.rows;
    int32_t      rowSize = pDataBlock->info.rowSize;
    int64_t      groupId = pDataBlock->info.groupId;
    for (int32_t j = 0; j < rows; ++j) {
      printf("|");
C
Cary Xu 已提交
1136
      TSKEY skey = TSKEY_INITIAL_VAL;  //  the start key of TS window by interval
C
Cary Xu 已提交
1137 1138
      void *pSmaKey = &smaKey;
      bool  isStartKey = false;
C
Cary Xu 已提交
1139

C
Cary Xu 已提交
1140 1141
      int32_t tlen = 0;     // reset the len
      pDataBuf = &dataBuf;  // reset the buf
C
Cary Xu 已提交
1142
      for (int32_t k = 0; k < colNum; ++k) {
C
Cary Xu 已提交
1143
        SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
C
Cary Xu 已提交
1144 1145 1146
        void            *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
C
Cary Xu 已提交
1147 1148 1149
            if (!isStartKey) {
              isStartKey = true;
              skey = *(TSKEY *)var;
C
Cary Xu 已提交
1150
              printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
C
Cary Xu 已提交
1151 1152 1153 1154 1155 1156
              tsdbEncodeTSmaKey(groupId, skey, &pSmaKey);
            } else {
              printf(" %" PRIi64 " |", *(int64_t *)var);
              tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
              break;
            }
C
Cary Xu 已提交
1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_UTINYINT:
            printf(" %15d |", *(uint8_t *)var);
            tlen += taosEncodeFixedU8(&pDataBuf, *(uint8_t *)var);
            break;
          case TSDB_DATA_TYPE_TINYINT:
            printf(" %15d |", *(int8_t *)var);
            tlen += taosEncodeFixedI8(&pDataBuf, *(int8_t *)var);
            break;
          case TSDB_DATA_TYPE_SMALLINT:
            printf(" %15d |", *(int16_t *)var);
            tlen += taosEncodeFixedI16(&pDataBuf, *(int16_t *)var);
            break;
          case TSDB_DATA_TYPE_USMALLINT:
            printf(" %15d |", *(uint16_t *)var);
            tlen += taosEncodeFixedU16(&pDataBuf, *(uint16_t *)var);
            break;
          case TSDB_DATA_TYPE_INT:
            printf(" %15d |", *(int32_t *)var);
            tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var);
            break;
          case TSDB_DATA_TYPE_FLOAT:
C
Cary Xu 已提交
1180 1181 1182
            printf(" %15f |", *(float *)var);
            tlen += taosEncodeBinary(&pDataBuf, var, sizeof(float));
            break;
C
Cary Xu 已提交
1183 1184 1185 1186 1187 1188 1189 1190 1191
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t *)var);
            tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var);
            break;
          case TSDB_DATA_TYPE_BIGINT:
            printf(" %15ld |", *(int64_t *)var);
            tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
            break;
          case TSDB_DATA_TYPE_DOUBLE:
C
Cary Xu 已提交
1192 1193
            printf(" %15lf |", *(double *)var);
            tlen += taosEncodeBinary(&pDataBuf, var, sizeof(double));
C
Cary Xu 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
          case TSDB_DATA_TYPE_UBIGINT:
            printf(" %15lu |", *(uint64_t *)var);
            tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var);
            break;
          case TSDB_DATA_TYPE_NCHAR: {
            char tmpChar[100] = {0};
            strncpy(tmpChar, varDataVal(var), varDataLen(var));
            printf(" %s |", tmpChar);
            tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
            break;
          }
C
Cary Xu 已提交
1205 1206
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
            char tmpChar[100] = {0};
C
Cary Xu 已提交
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
            strncpy(tmpChar, varDataVal(var), varDataLen(var));
            printf(" %s |", tmpChar);
            tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
            break;
          }
          case TSDB_DATA_TYPE_VARBINARY:
            // TODO: add binary/varbinary
            TASSERT(0);
          default:
            printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
            TASSERT(0);
            break;
        }
      }
C
Cary Xu 已提交
1221 1222
      // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
      if (tlen > 0) {
C
Cary Xu 已提交
1223 1224 1225 1226 1227 1228 1229 1230
        int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision));

        // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
        // file
        //         - Set and open the DFile or the B+Tree file
        // TODO: tsdbStartTSmaCommit();
        if (fid != tSmaH.dFile.fid) {
          if (tSmaH.dFile.fid != TSDB_IVLD_FID) {
C
Cary Xu 已提交
1231
            tsdbSmaEndCommit(pEnv);
C
Cary Xu 已提交
1232 1233 1234
            tsdbCloseDBF(&tSmaH.dFile);
          }
          tsdbSetTSmaDataFile(&tSmaH, indexUid, fid);
C
Cary Xu 已提交
1235
          if (tsdbOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
C
Cary Xu 已提交
1236 1237 1238 1239 1240 1241
            tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
                     tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
            tsdbDestroyTSmaWriteH(&tSmaH);
            tsdbUnRefSmaStat(pTsdb, pStat);
            return TSDB_CODE_FAILED;
          }
C
Cary Xu 已提交
1242
          tsdbSmaBeginCommit(pEnv);
C
Cary Xu 已提交
1243
        }
C
Cary Xu 已提交
1244

C
Cary Xu 已提交
1245
        if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
C
Cary Xu 已提交
1246
          tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
C
Cary Xu 已提交
1247 1248
                   " since %s",
                   REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
C
Cary Xu 已提交
1249
          tsdbSmaEndCommit(pEnv);
C
Cary Xu 已提交
1250 1251 1252 1253
          tsdbDestroyTSmaWriteH(&tSmaH);
          tsdbUnRefSmaStat(pTsdb, pStat);
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1254 1255
        tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
                  REPO_ID(pTsdb), indexUid, skey, groupId);
C
Cary Xu 已提交
1256
        // TODO:tsdbEndTSmaCommit();
C
Cary Xu 已提交
1257

C
Cary Xu 已提交
1258
        // Step 3: reset the SSmaStat
C
Cary Xu 已提交
1259
        tsdbResetExpiredWindow(pTsdb, pStat, indexUid, skey);
C
Cary Xu 已提交
1260 1261 1262 1263
      } else {
        tsdbWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64,
                 REPO_ID(pTsdb), skey, tlen, indexUid);
      }
C
Cary Xu 已提交
1264

C
Cary Xu 已提交
1265 1266
      printf("\n");
    }
1267
  }
C
Cary Xu 已提交
1268
  tsdbSmaEndCommit(pEnv);  // TODO: not commit for every insert
1269
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
1270
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1271

C
Cary Xu 已提交
1272 1273 1274
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1275 1276 1277
/**
 * @brief Drop tSma data and local cache
 *        - insert/query reference
C
Cary Xu 已提交
1278 1279 1280
 * @param pTsdb
 * @param msg
 * @return int32_t
C
Cary Xu 已提交
1281 1282
 */
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
C
Cary Xu 已提交
1283
  SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb));
C
Cary Xu 已提交
1284 1285 1286 1287 1288 1289

  // clear local cache
  if (pEnv) {
    tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);

    SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1290
    if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) {
C
Cary Xu 已提交
1291 1292
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1293
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
1294 1295 1296 1297 1298 1299
      }

      tsdbWLockSma(pEnv);
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbUnLockSma(pEnv);
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1300
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
1301 1302 1303 1304 1305
      }
      tsdbSmaStatSetDropped(pItem);
      tsdbUnLockSma(pEnv);

      int32_t nSleep = 0;
C
Cary Xu 已提交
1306
      int32_t refVal = INT32_MAX;
C
Cary Xu 已提交
1307
      while (true) {
C
Cary Xu 已提交
1308 1309
        if ((refVal = T_REF_VAL_GET(SMA_ENV_STAT(pEnv))) <= 0) {
          tsdbDebug("vgId:%d drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal);
C
Cary Xu 已提交
1310 1311
          break;
        }
C
Cary Xu 已提交
1312
        tsdbDebug("vgId:%d wait 1s to drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal);
C
Cary Xu 已提交
1313 1314
        taosSsleep(1);
        if (++nSleep > SMA_DROP_EXPIRED_TIME) {
C
Cary Xu 已提交
1315 1316
          tsdbDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", REPO_ID(pTsdb), indexUid, nSleep,
                    refVal);
C
Cary Xu 已提交
1317 1318 1319 1320 1321 1322 1323 1324 1325
          break;
        };
      }

      tsdbFreeSmaStatItem(pItem);
      tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
    }
  }
  // clear sma data files
C
Cary Xu 已提交
1326
  // TODO:
1327
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1328 1329
}

C
Cary Xu 已提交
1330
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) {
1331 1332 1333 1334
  STsdb *pTsdb = pSmaH->pTsdb;

  char tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1335
  pSmaH->dFile.path = strdup(tSmaFile);
C
Cary Xu 已提交
1336 1337 1338 1339

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1340 1341 1342
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
  STsdbCfg     *pCfg = REPO_CFG(pTsdb);
  const SArray *pDataBlocks = (const SArray *)msg;
C
Cary Xu 已提交
1343
  SSmaEnv      *pEnv = atomic_load_ptr(&REPO_RSMA_ENV(pTsdb));
C
Cary Xu 已提交
1344
  int64_t       indexUid = SMA_TEST_INDEX_UID;
C
Cary Xu 已提交
1345

C
Cary Xu 已提交
1346
  if (!pEnv) {
C
Cary Xu 已提交
1347
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1348 1349 1350 1351
    tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return terrno;
  }

C
Cary Xu 已提交
1352
  if (!pDataBlocks) {
C
Cary Xu 已提交
1353
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1354
    tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb));
C
Cary Xu 已提交
1355 1356 1357
    return terrno;
  }

C
Cary Xu 已提交
1358
  if (taosArrayGetSize(pDataBlocks) <= 0) {
C
Cary Xu 已提交
1359
    terrno = TSDB_CODE_INVALID_PARA;
C
Cary Xu 已提交
1360
    tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is empty", REPO_ID(pTsdb));
C
Cary Xu 已提交
1361 1362 1363
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1364
  SSmaStat     *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
1365 1366 1367 1368
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
1369 1370
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1371 1372
  }

C
Cary Xu 已提交
1373
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) {
C
Cary Xu 已提交
1374 1375
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1376
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1377
  }
C
Cary Xu 已提交
1378

C
Cary Xu 已提交
1379 1380
  STSma *pSma = pItem->pSma;

C
Cary Xu 已提交
1381
  STSmaWriteH tSmaH = {0};
C
Cary Xu 已提交
1382

C
Cary Xu 已提交
1383
  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) {
C
Cary Xu 已提交
1384 1385
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1386

C
Cary Xu 已提交
1387 1388
  char rPath[TSDB_FILENAME_LEN] = {0};
  char aPath[TSDB_FILENAME_LEN] = {0};
C
Cary Xu 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_FAILED;
    }
  }

  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
1398
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
1399
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
1400
#if 0
C
Cary Xu 已提交
1401 1402
  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));

C
Cary Xu 已提交
1403 1404
  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file
C
Cary Xu 已提交
1405
  // TODO: tsdbStartTSmaCommit();
C
Cary Xu 已提交
1406 1407 1408 1409 1410 1411 1412
  tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
  if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
    tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
             tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
1413

C
Cary Xu 已提交
1414 1415 1416 1417 1418
  if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
    tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1419 1420
  // TODO:tsdbEndTSmaCommit();

C
Cary Xu 已提交
1421 1422
  // Step 3: reset the SSmaStat
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
1423
#endif
C
Cary Xu 已提交
1424

C
Cary Xu 已提交
1425
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
1426
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1427 1428 1429 1430
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
1431
 * @brief
C
Cary Xu 已提交
1432 1433 1434
 *
 * @param pSmaH
 * @param pTsdb
C
Cary Xu 已提交
1435 1436
 * @param interval
 * @param intervalUnit
C
Cary Xu 已提交
1437 1438
 * @return int32_t
 */
C
Cary Xu 已提交
1439
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
1440
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
1441
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
C
Cary Xu 已提交
1442 1443
  pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
  pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
1444
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1445 1446 1447 1448 1449 1450
}

/**
 * @brief Init of tSma FS
 *
 * @param pReadH
1451
 * @param indexUid
C
Cary Xu 已提交
1452
 * @param skey
C
Cary Xu 已提交
1453 1454
 * @return int32_t
 */
1455 1456 1457 1458
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
  STsdb *pTsdb = pSmaH->pTsdb;

  int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision));
C
Cary Xu 已提交
1459
  char    tSmaFile[TSDB_FILENAME_LEN] = {0};
1460
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1461 1462 1463
  pSmaH->dFile.path = strdup(tSmaFile);
  pSmaH->smaFsIter.iter = 0;
  pSmaH->smaFsIter.fid = fid;
1464
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475
}

/**
 * @brief Set and open tSma file if it has key locates in queryWin.
 *
 * @param pReadH
 * @param param
 * @param queryWin
 * @return true
 * @return false
 */
C
Cary Xu 已提交
1476
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
C
Cary Xu 已提交
1477
  SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
C
Cary Xu 已提交
1478 1479
  int32_t nSmaFs = taosArrayGetSize(smaFs);

C
Cary Xu 已提交
1480
  tsdbCloseDBF(&pReadH->dFile);
C
Cary Xu 已提交
1481

C
Cary Xu 已提交
1482
#if 0
C
Cary Xu 已提交
1483 1484 1485 1486
  while (pReadH->smaFsIter.iter < nSmaFs) {
    void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
    if (pSmaFile) {  // match(indexName, queryWindow)
      // TODO: select the file by index_name ...
C
Cary Xu 已提交
1487
      pReadH->dFile = pSmaFile;
C
Cary Xu 已提交
1488 1489 1490 1491 1492 1493
      ++pReadH->smaFsIter.iter;
      break;
    }
    ++pReadH->smaFsIter.iter;
  }

C
Cary Xu 已提交
1494
  if (pReadH->pDFile) {
C
Cary Xu 已提交
1495 1496 1497
    tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
    return true;
  }
C
Cary Xu 已提交
1498
#endif
C
Cary Xu 已提交
1499 1500 1501 1502 1503

  return false;
}

/**
C
Cary Xu 已提交
1504
 * @brief
C
Cary Xu 已提交
1505
 *
C
Cary Xu 已提交
1506
 * @param pTsdb Return the data between queryWin and fill the pData.
C
Cary Xu 已提交
1507
 * @param pData
C
Cary Xu 已提交
1508 1509
 * @param indexUid
 * @param pQuerySKey
C
Cary Xu 已提交
1510 1511 1512
 * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
 * @return int32_t
 */
C
Cary Xu 已提交
1513
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
1514 1515
  SSmaEnv  *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb));
  SSmaStat *pStat = NULL;
1516 1517

  if (!pEnv) {
C
Cary Xu 已提交
1518 1519 1520 1521 1522
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1523 1524 1525
  pStat = SMA_ENV_STAT(pEnv);

  tsdbRefSmaStat(pTsdb, pStat);
1526
  SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1527
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) {
C
Cary Xu 已提交
1528 1529
    // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
    // it's NULL.
C
Cary Xu 已提交
1530
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1531
    terrno = TSDB_CODE_TDB_INVALID_ACTION;
1532
    tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1533
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1534 1535
  }

C
Cary Xu 已提交
1536 1537
#if 0
  int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
C
Cary Xu 已提交
1538
  for (int32_t n = 0; n < nQueryWin; ++n) {
C
Cary Xu 已提交
1539
    TSKEY skey = taosArrayGet(pQuerySKey, n);
C
Cary Xu 已提交
1540
    if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) {
C
Cary Xu 已提交
1541 1542 1543
      // TODO: mark this window as expired.
    }
  }
C
Cary Xu 已提交
1544
#endif
C
Cary Xu 已提交
1545

1546
#if 1
C
Cary Xu 已提交
1547 1548
  int8_t smaStat = 0;
  if (!tsdbSmaStatIsOK(pItem, &smaStat)) {  // TODO: multiple check for large scale sma query
C
Cary Xu 已提交
1549
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1550
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
C
Cary Xu 已提交
1551 1552
    tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
             tstrerror(terrno), smaStat);
C
Cary Xu 已提交
1553 1554 1555
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1556
  if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) {
C
Cary Xu 已提交
1557
    // TODO: mark this window as expired.
1558 1559 1560 1561 1562
    tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
              querySKey, indexUid);
  } else {
    tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey,
              indexUid);
C
Cary Xu 已提交
1563
  }
C
Cary Xu 已提交
1564 1565

  STSma *pTSma = pItem->pSma;
C
Cary Xu 已提交
1566
#endif
1567

C
Cary Xu 已提交
1568
  STSmaReadH tReadH = {0};
C
Cary Xu 已提交
1569
  tsdbInitTSmaReadH(&tReadH, pTsdb, pTSma->interval, pTSma->intervalUnit);
C
Cary Xu 已提交
1570
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
1571 1572

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1573

1574
  tsdbInitTSmaFile(&tReadH, indexUid, querySKey);
C
Cary Xu 已提交
1575
  if (tsdbOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
C
Cary Xu 已提交
1576 1577 1578 1579
    tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1580 1581
  char    smaKey[SMA_KEY_LEN] = {0};
  void   *pSmaKey = &smaKey;
C
Cary Xu 已提交
1582 1583
  int64_t queryGroupId = 1;
  tsdbEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
C
Cary Xu 已提交
1584

C
Cary Xu 已提交
1585 1586
  tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
            tReadH.dFile.path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
C
Cary Xu 已提交
1587

C
Cary Xu 已提交
1588 1589
  void   *result = NULL;
  int32_t valueSize = 0;
C
Cary Xu 已提交
1590
  if (!(result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) {
C
Cary Xu 已提交
1591 1592
    tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
             REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
C
Cary Xu 已提交
1593 1594 1595
    tsdbCloseDBF(&tReadH.dFile);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1596 1597

#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
1598
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
1599
    tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
C
Cary Xu 已提交
1600 1601
  }
#endif
wafwerar's avatar
wafwerar 已提交
1602
  taosMemoryFreeClear(result);  // TODO: fill the result to output
C
Cary Xu 已提交
1603

C
Cary Xu 已提交
1604
#if 0
C
Cary Xu 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613
  int32_t nResult = 0;
  int64_t lastKey = 0;

  while (true) {
    if (nResult >= nMaxResult) {
      break;
    }

    // set and open the file according to the STSma param
C
Cary Xu 已提交
1614
    if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) {
C
Cary Xu 已提交
1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625
      char bTree[100] = "\0";
      while (strncmp(bTree, "has more nodes", 100) == 0) {
        if (nResult >= nMaxResult) {
          break;
        }
        // tsdbGetDataFromBTree(bTree, queryWin, lastKey)
        // fill the pData
        ++nResult;
      }
    }
  }
C
Cary Xu 已提交
1626
#endif
C
Cary Xu 已提交
1627
  // read data from file and fill the result
C
Cary Xu 已提交
1628
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
1629 1630 1631
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1632 1633
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
  SSmaCfg vCreateSmaReq = {0};
C
Cary Xu 已提交
1634
  if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) {
C
Cary Xu 已提交
1635 1636 1637 1638 1639
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
    return -1;
  }
  tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName,
C
Cary Xu 已提交
1640
            vCreateSmaReq.tSma.indexUid);
C
Cary Xu 已提交
1641 1642 1643 1644

  // record current timezone of server side
  vCreateSmaReq.tSma.timezoneInt = tsTimezone;

H
Hongze Cheng 已提交
1645
  if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) {
C
Cary Xu 已提交
1646 1647 1648 1649 1650 1651 1652 1653 1654
    // TODO: handle error
    tdDestroyTSma(&vCreateSmaReq.tSma);
    return -1;
  }

  tsdbTSmaAdd(pTsdb, 1);

  tdDestroyTSma(&vCreateSmaReq.tSma);
  // TODO: return directly or go on follow steps?
1655
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1656 1657 1658 1659
}

int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
  SVDropTSmaReq vDropSmaReq = {0};
C
Cary Xu 已提交
1660
  if (!tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq)) {
C
Cary Xu 已提交
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  // TODO: send msg to stream computing to drop tSma
  // if ((send msg to stream computing) < 0) {
  //   tdDestroyTSma(&vCreateSmaReq);
  //   return -1;
  // }
  //

H
Hongze Cheng 已提交
1672
  if (metaDropTSma(REPO_META(pTsdb), vDropSmaReq.indexUid) < 0) {
C
Cary Xu 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681
    // TODO: handle error
    return -1;
  }

  if (tsdbDropTSmaData(pTsdb, vDropSmaReq.indexUid) < 0) {
    // TODO: handle error
    return -1;
  }

C
Cary Xu 已提交
1682
  tsdbTSmaSub(pTsdb, 1);
C
Cary Xu 已提交
1683 1684

  // TODO: return directly or go on follow steps?
1685
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1686 1687
}

C
Cary Xu 已提交
1688
/**
C
Cary Xu 已提交
1689
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
C
Cary Xu 已提交
1690 1691 1692 1693 1694 1695
 *
 * @param pTsdb
 * @param pMeta
 * @param pReq
 * @return int32_t
 */
C
Cary Xu 已提交
1696 1697 1698
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
  if (!pReq->rollup) {
    tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1699 1700 1701
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1702 1703
  SRSmaParam *param = &pReq->pRSmaParam;

C
Cary Xu 已提交
1704
  if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
C
Cary Xu 已提交
1705
    tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717
    return TSDB_CODE_SUCCESS;
  }

  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

  SSmaEnv   *pEnv = REPO_RSMA_ENV(pTsdb);
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

C
Cary Xu 已提交
1718
  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
C
Cary Xu 已提交
1719
  if (pRSmaInfo) {
C
Cary Xu 已提交
1720
    tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1721
    return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1722 1723
  }

C
Cary Xu 已提交
1724
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
C
Cary Xu 已提交
1725
  if (!pRSmaInfo) {
C
Cary Xu 已提交
1726 1727 1728 1729
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1730
  STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
C
Cary Xu 已提交
1731
  if (!pReadHandle) {
C
Cary Xu 已提交
1732
    taosMemoryFree(pRSmaInfo);
C
Cary Xu 已提交
1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }

  SReadHandle handle = {
      .reader = pReadHandle,
      .meta = pMeta,
  };

  if (param->qmsg1) {
    pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
C
Cary Xu 已提交
1744
    if (!pRSmaInfo->taskInfo[0]) {
C
Cary Xu 已提交
1745 1746 1747 1748 1749 1750 1751 1752
      taosMemoryFree(pRSmaInfo);
      taosMemoryFree(pReadHandle);
      return TSDB_CODE_FAILED;
    }
  }

  if (param->qmsg2) {
    pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
C
Cary Xu 已提交
1753
    if (!pRSmaInfo->taskInfo[1]) {
C
Cary Xu 已提交
1754 1755 1756 1757 1758 1759
      taosMemoryFree(pRSmaInfo);
      taosMemoryFree(pReadHandle);
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
1760
  if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
C
Cary Xu 已提交
1761
      TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1762
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1763
  } else {
C
Cary Xu 已提交
1764
    tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->suid);
C
Cary Xu 已提交
1765 1766 1767 1768 1769
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1770 1771 1772 1773 1774 1775 1776 1777
/**
 * @brief store suid/[uids], prefer to use array and then hash
 *
 * @param pStore
 * @param suid
 * @param uid
 * @return int32_t
 */
C
Cary Xu 已提交
1778
static int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
C
Cary Xu 已提交
1779
  // prefer to store suid/uids in array
C
Cary Xu 已提交
1780
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
C
Cary Xu 已提交
1781
    if (pStore->suid == 0) {
C
Cary Xu 已提交
1782 1783
      pStore->suid = suid;
    }
C
Cary Xu 已提交
1784 1785 1786 1787 1788 1789 1790
    if (uid) {
      if (!pStore->tbUids) {
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
      }
C
Cary Xu 已提交
1791
      if (!taosArrayPush(pStore->tbUids, uid)) {
C
Cary Xu 已提交
1792 1793
        return TSDB_CODE_FAILED;
      }
C
Cary Xu 已提交
1794 1795
    }
  } else {
C
Cary Xu 已提交
1796 1797
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
    if (!pStore->uidHash) {
C
Cary Xu 已提交
1798
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
1799
      if (!pStore->uidHash) {
C
Cary Xu 已提交
1800 1801 1802
        return TSDB_CODE_FAILED;
      }
    }
C
Cary Xu 已提交
1803 1804 1805
    if (uid) {
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
C
Cary Xu 已提交
1806
        taosArrayPush(uidArray, uid);
C
Cary Xu 已提交
1807 1808 1809 1810 1811 1812
      } else {
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
        if (!pUidArray) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1813
        if (!taosArrayPush(pUidArray, uid)) {
C
Cary Xu 已提交
1814 1815 1816 1817 1818 1819
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
        if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) {
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1820
      }
C
Cary Xu 已提交
1821 1822
    } else {
      if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) {
C
Cary Xu 已提交
1823 1824 1825 1826 1827 1828 1829
        return TSDB_CODE_FAILED;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1830 1831 1832 1833
void tsdbUidStoreDestory(STbUidStore *pStore) {
  if (pStore) {
    if (pStore->uidHash) {
      if (pStore->tbUids) {
C
Cary Xu 已提交
1834
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
C
Cary Xu 已提交
1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
        void *pIter = taosHashIterate(pStore->uidHash, NULL);
        while (pIter) {
          SArray *arr = *(SArray **)pIter;
          taosArrayDestroy(arr);
          pIter = taosHashIterate(pStore->uidHash, pIter);
        }
      }
      taosHashCleanup(pStore->uidHash);
    }
    taosArrayDestroy(pStore->tbUids);
  }
}

void *tsdbUidStoreFree(STbUidStore *pStore) {
C
Cary Xu 已提交
1849 1850 1851 1852
  if (pStore) {
    tsdbUidStoreDestory(pStore);
    taosMemoryFree(pStore);
  }
C
Cary Xu 已提交
1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864
  return NULL;
}

/**
 * @brief fetch suid/uids when create child tables of rollup SMA
 *
 * @param pTsdb
 * @param ppStore
 * @param suid
 * @param uid
 * @return int32_t
 */
C
Cary Xu 已提交
1865
int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
C
Cary Xu 已提交
1866
  SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb);
C
Cary Xu 已提交
1867 1868

  // only applicable to rollup SMA ctables
C
Cary Xu 已提交
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879
  if (!pEnv) {
    return TSDB_CODE_SUCCESS;
  }

  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  SHashObj *infoHash = NULL;
  if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1880
  // info cached when create rsma stable and return directly for non-rsma ctables
C
Cary Xu 已提交
1881
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
C
Cary Xu 已提交
1882 1883 1884
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1885
  if (!(*ppStore)) {
C
Cary Xu 已提交
1886 1887 1888 1889
    if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) {
      return TSDB_CODE_FAILED;
    }
  }
C
Cary Xu 已提交
1890

C
Cary Xu 已提交
1891
  if (tsdbUidStorePut(*ppStore, suid, &uid) != 0) {
C
Cary Xu 已提交
1892 1893 1894
    *ppStore = tsdbUidStoreFree(*ppStore);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1895

C
Cary Xu 已提交
1896 1897 1898
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1899 1900 1901 1902 1903 1904 1905 1906 1907
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids) {
  SSmaEnv   *pEnv = REPO_RSMA_ENV(pTsdb);
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

  if (!suid || !tbUids) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1908
  }
C
Cary Xu 已提交
1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919

  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
  if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
    tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64, REPO_ID(pTsdb), *suid);
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    return TSDB_CODE_FAILED;
  }

  if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) {
    tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1920 1921 1922
  } else {
    tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
              pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
C
Cary Xu 已提交
1923
  }
C
Cary Xu 已提交
1924

C
Cary Xu 已提交
1925 1926 1927
  if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) {
    tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1928 1929 1930
  } else {
    tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
              pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
C
Cary Xu 已提交
1931 1932 1933
  }

  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1934 1935
}

C
Cary Xu 已提交
1936 1937 1938
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
    tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb));
C
Cary Xu 已提交
1939 1940 1941
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1942
  if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1943 1944 1945
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957
  void *pIter = taosHashIterate(pStore->uidHash, NULL);
  while (pIter) {
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
    SArray   *pTbUids = *(SArray **)pIter;

    if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
      taosHashCancelIterate(pStore->uidHash, pIter);
      return TSDB_CODE_FAILED;
    }

    pIter = taosHashIterate(pStore->uidHash, pIter);
  }
C
Cary Xu 已提交
1958 1959 1960
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1961
static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
C
Cary Xu 已提交
1962 1963 1964 1965 1966 1967 1968 1969
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SSubmitBlkIter blkIter = {0};
  STSRow        *row = NULL;

  terrno = TSDB_CODE_SUCCESS;

C
Cary Xu 已提交
1970
  if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
C
Cary Xu 已提交
1971
  while (true) {
C
Cary Xu 已提交
1972
    if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1;
C
Cary Xu 已提交
1973

C
Cary Xu 已提交
1974
    if (!pBlock) break;
C
Cary Xu 已提交
1975
    tsdbUidStorePut(pStore, msgIter.suid, NULL);
C
Cary Xu 已提交
1976 1977 1978 1979 1980 1981 1982
  }

  if (terrno != TSDB_CODE_SUCCESS) return -1;
  return 0;
}

int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType, tb_uid_t *suid) {
C
Cary Xu 已提交
1983 1984
  SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
  if (!pEnv) {
C
Cary Xu 已提交
1985
    // only applicable when rsma env exists
C
Cary Xu 已提交
1986 1987 1988 1989 1990 1991
    return TSDB_CODE_SUCCESS;
  }

  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

C
Cary Xu 已提交
1992 1993 1994 1995 1996 1997 1998
  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));

  if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
    tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), *suid);
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1999 2000 2001
  SArray *pResult = NULL;

  pResult = taosArrayInit(0, sizeof(SSDataBlock));
C
Cary Xu 已提交
2002
  if (!pResult) {
C
Cary Xu 已提交
2003 2004 2005 2006 2007
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
C
Cary Xu 已提交
2008
    if (pRSmaInfo->taskInfo[0]) {
C
Cary Xu 已提交
2009 2010
      tsdbDebug("vgId:%d execute rsma task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), pRSmaInfo->taskInfo[0],
                *suid);
C
Cary Xu 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
      qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType);
      while (1) {
        SSDataBlock *output;
        uint64_t     ts;
        if (qExecTask(pRSmaInfo->taskInfo[0], &output, &ts) < 0) {
          ASSERT(false);
        }
        if (!output) {
          break;
        }
        taosArrayPush(pResult, output);
      }
C
Cary Xu 已提交
2023 2024 2025 2026 2027
      if (taosArrayGetSize(pResult) > 0) {
        blockDebugShowData(pResult);
      } else {
        tsdbWarn("vgId:%d no sma data generated since %s", REPO_ID(pTsdb), tstrerror(terrno));
      }
C
Cary Xu 已提交
2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043
    }

    // if (pRSmaInfo->taskInfo[1]) {
    //   qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType);
    //   while (1) {
    //     SSDataBlock *output;
    //     uint64_t     ts;
    //     if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) {
    //       ASSERT(false);
    //     }
    //     if (!output) {
    //       break;
    //     }
    //     taosArrayPush(pResult, output);
    //   }
    //   blockDebugShowData(pResult);
C
Cary Xu 已提交
2044
    // }
C
Cary Xu 已提交
2045
  }
C
Cary Xu 已提交
2046

C
Cary Xu 已提交
2047 2048
  return TSDB_CODE_SUCCESS;
}
C
Cary Xu 已提交
2049

C
Cary Xu 已提交
2050
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType) {
C
Cary Xu 已提交
2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068
  SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
  if (!pEnv) {
    // only applicable when rsma env exists
    return TSDB_CODE_SUCCESS;
  }

  if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
    STbUidStore uidStore = {0};
    tsdbFetchSubmitReqSuids(pMsg, &uidStore);

    if (uidStore.suid != 0) {
      tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, &uidStore.suid);

      void *pIter = taosHashIterate(uidStore.uidHash, NULL);
      while (pIter) {
        tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
        tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, pTbSuid);
        pIter = taosHashIterate(uidStore.uidHash, pIter);
C
Cary Xu 已提交
2069
      }
C
Cary Xu 已提交
2070 2071

      tsdbUidStoreDestory(&uidStore);
C
Cary Xu 已提交
2072 2073 2074 2075 2076
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
2077
#if 0
C
Cary Xu 已提交
2078 2079 2080 2081 2082 2083 2084 2085 2086 2087
/**
 * @brief Get the start TS key of the last data block of one interval/sliding.
 *
 * @param pTsdb
 * @param param
 * @param result
 * @return int32_t
 *         1) Return 0 and fill the result if the check procedure is normal;
 *         2) Return -1 if error occurs during the check procedure.
 */
C
Cary Xu 已提交
2088
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) {
C
Cary Xu 已提交
2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104
  const char *procedure = "";
  if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
    return -1;
  }
  // fill the result
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Remove the tSma data files related to param between pWin.
 *
 * @param pTsdb
 * @param param
 * @param pWin
 * @return int32_t
 */
C
Cary Xu 已提交
2105
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
C
Cary Xu 已提交
2106 2107 2108 2109
  // for ("tSmaFiles of param-interval-sliding between pWin") {
  //   // remove the tSmaFile
  // }
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2110
}
C
Cary Xu 已提交
2111 2112
#endif

C
Cary Xu 已提交
2113
// TODO: Who is responsible for resource allocate and release?
C
Cary Xu 已提交
2114
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
C
Cary Xu 已提交
2115
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2116
  if ((code = tsdbInsertTSmaDataImpl(pTsdb, indexUid, msg)) < 0) {
C
Cary Xu 已提交
2117 2118
    tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
C
Cary Xu 已提交
2119
  // TODO: destroy SSDataBlocks(msg)
C
Cary Xu 已提交
2120 2121 2122
  return code;
}

2123
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
C
Cary Xu 已提交
2124
  int32_t code = TSDB_CODE_SUCCESS;
2125
  if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg, version)) < 0) {
C
Cary Xu 已提交
2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138
    tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

C
Cary Xu 已提交
2139
int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
2140
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2141
  if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, querySKey, nMaxResult)) < 0) {
C
Cary Xu 已提交
2142 2143 2144
    tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
2145 2146 2147 2148 2149 2150 2151 2152
}

int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
    tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
2153
}