tsdbSma.c 68.7 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16 17
#include "tsdbSma.h"
#include "tsdb.h"
C
Cary Xu 已提交
18

C
Cary Xu 已提交
19 20 21 22 23
static const char *TSDB_SMA_DNAME[] = {
    "",      // TSDB_SMA_TYPE_BLOCK
    "tsma",  // TSDB_SMA_TYPE_TIME_RANGE
    "rsma",  // TSDB_SMA_TYPE_ROLLUP
};
C
Cary Xu 已提交
24

C
Cary Xu 已提交
25
#undef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
26
#define SMA_STORAGE_TSDB_DAYS   30
C
Cary Xu 已提交
27
#define SMA_STORAGE_TSDB_TIMES  10
C
Cary Xu 已提交
28
#define SMA_STORAGE_SPLIT_HOURS 24
C
Cary Xu 已提交
29
#define SMA_KEY_LEN             16  // TSKEY+groupId 8+8
C
Cary Xu 已提交
30
#define SMA_DROP_EXPIRED_TIME   10  // default is 10 seconds
C
Cary Xu 已提交
31

32 33 34 35
#define SMA_STATE_HASH_SLOT      4
#define SMA_STATE_ITEM_HASH_SLOT 32

#define SMA_TEST_INDEX_NAME "smaTestIndexName"  // TODO: just for test
36
#define SMA_TEST_INDEX_UID  2000000001          // TODO: just for test
C
Cary Xu 已提交
37 38

typedef struct SRSmaInfo SRSmaInfo;
C
Cary Xu 已提交
39
typedef enum {
C
Cary Xu 已提交
40 41
  SMA_STORAGE_LEVEL_TSDB = 0,     // use days of self-defined  e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
  SMA_STORAGE_LEVEL_DFILESET = 1  // use days of TS data       e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
C
Cary Xu 已提交
42 43
} ESmaStorageLevel;

C
Cary Xu 已提交
44 45 46 47 48 49 50 51
typedef struct SPoolMem {
  int64_t          size;
  struct SPoolMem *prev;
  struct SPoolMem *next;
} SPoolMem;

struct SSmaEnv {
  TdThreadRwlock lock;
C
Cary Xu 已提交
52
  int8_t         type;
C
Cary Xu 已提交
53 54 55 56 57 58 59 60 61
  TXN            txn;
  SPoolMem      *pPool;
  SDiskID        did;
  TENV          *dbEnv;  // TODO: If it's better to put it in smaIndex level?
  char          *path;   // relative path
  SSmaStat      *pStat;
};

#define SMA_ENV_LOCK(env)       ((env)->lock)
C
Cary Xu 已提交
62
#define SMA_ENV_TYPE(env)       ((env)->type)
C
Cary Xu 已提交
63 64 65 66 67 68
#define SMA_ENV_DID(env)        ((env)->did)
#define SMA_ENV_ENV(env)        ((env)->dbEnv)
#define SMA_ENV_PATH(env)       ((env)->path)
#define SMA_ENV_STAT(env)       ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)

C
Cary Xu 已提交
69
typedef struct {
C
Cary Xu 已提交
70 71 72 73
  STsdb        *pTsdb;
  SDBFile       dFile;
  const SArray *pDataBlocks;  // sma data
  int32_t       interval;     // interval with the precision of DB
C
Cary Xu 已提交
74 75 76 77
} STSmaWriteH;

typedef struct {
  int32_t iter;
C
Cary Xu 已提交
78
  int32_t fid;
C
Cary Xu 已提交
79
} SmaFsIter;
C
Cary Xu 已提交
80

C
Cary Xu 已提交
81
typedef struct {
C
Cary Xu 已提交
82
  STsdb    *pTsdb;
C
Cary Xu 已提交
83
  SDBFile   dFile;
C
Cary Xu 已提交
84 85 86 87 88 89 90
  int32_t   interval;   // interval with the precision of DB
  int32_t   blockSize;  // size of SMA block item
  int8_t    storageLevel;
  int8_t    days;
  SmaFsIter smaFsIter;
} STSmaReadH;

91 92 93 94 95
typedef struct {
  /**
   * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
   *    - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
   * Streaming Module or TSDB local persistence.
C
Cary Xu 已提交
96 97 98
   *    - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
   * without information about its previous state.
   *    - TSDB_SMA_STAT_DROPPED: 1)sma dropped
C
Cary Xu 已提交
99
   * N.B. only applicable to tsma
100 101 102
   */
  int8_t    state;           // ETsdbSmaStat
  SHashObj *expiredWindows;  // key: skey of time window, value: N/A
C
Cary Xu 已提交
103
  STSma    *pSma;            // cache schema
104 105
} SSmaStatItem;

C
Cary Xu 已提交
106 107
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
C
Cary Xu 已提交
108
  void *taskInfo[TSDB_RSMA_RETENTION_2];  // qTaskInfo_t
C
Cary Xu 已提交
109 110
};

111
struct SSmaStat {
C
Cary Xu 已提交
112 113 114 115
  union {
    SHashObj *smaStatItems;  // key: indexUid, value: SSmaStatItem for tsma
    SHashObj *rsmaInfoHash;  // key: stbUid, value: SRSmaInfo;
  };
116
  T_REF_DECLARE()
117
};
C
Cary Xu 已提交
118 119
#define SMA_STAT_ITEMS(s)     ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
C
Cary Xu 已提交
120 121 122 123 124 125 126 127 128 129

static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
  // Note: free/kill may in RC
  qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
  if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
    qDestroyTask(otaskHandle);
  }
}

static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
C
Cary Xu 已提交
130
  for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) {
C
Cary Xu 已提交
131 132 133 134 135 136
    if (pInfo->taskInfo[i]) {
      tsdbFreeTaskHandle(pInfo->taskInfo[i]);
    }
  }
  return NULL;
}
137

C
Cary Xu 已提交
138
// declaration of static functions
C
Cary Xu 已提交
139

140
// expired window
141
static int32_t  tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
H
refact  
Hongze Cheng 已提交
142 143
static int32_t  tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
                                     int64_t version);
C
Cary Xu 已提交
144
static int32_t  tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType);
C
Cary Xu 已提交
145
static void    *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
C
Cary Xu 已提交
146
static int32_t  tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
C
Cary Xu 已提交
147 148
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did);
static int32_t  tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv);
149 150 151 152 153 154
static int32_t  tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t  tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
static int32_t  tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);

// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
C
Cary Xu 已提交
155
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
C
Cary Xu 已提交
156

157
// insert data
C
Cary Xu 已提交
158
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval,
C
Cary Xu 已提交
159
                                  int8_t intervalUnit);
160 161 162
static void    tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
C
Cary Xu 已提交
163
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid);
C
Cary Xu 已提交
164 165
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
                                    TXN *txn);
C
Cary Xu 已提交
166
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
C
Cary Xu 已提交
167
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
C
Cary Xu 已提交
168
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid);
169
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
C
Cary Xu 已提交
170
static bool    tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
C
Cary Xu 已提交
171
static void    tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
C
Cary Xu 已提交
172 173
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
C
Cary Xu 已提交
174

C
Cary Xu 已提交
175
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
C
Cary Xu 已提交
176
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
C
Cary Xu 已提交
177 178 179
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);

C
Cary Xu 已提交
180 181 182 183 184 185 186 187 188 189
// Pool Memory
static SPoolMem *openPool();
static void      clearPool(SPoolMem *pPool);
static void      closePool(SPoolMem *pPool);
static void     *poolMalloc(void *arg, size_t size);
static void      poolFree(void *arg, void *ptr);

static int tsdbSmaBeginCommit(SSmaEnv *pEnv);
static int tsdbSmaEndCommit(SSmaEnv *pEnv);

190
// implementation
C
Cary Xu 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) {
  return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}
static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) {
  return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n);
}

static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockRdlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockWrlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) {
  int code = taosThreadRwlockUnlock(&(pEnv->lock));
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    return -1;
  }
  return 0;
}

static SPoolMem *openPool() {
H
Hongze Cheng 已提交
226
  SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
C
Cary Xu 已提交
227 228 229 230 231 232 233 234 235

  pPool->prev = pPool->next = pPool;
  pPool->size = 0;

  return pPool;
}

static void clearPool(SPoolMem *pPool) {
  if (!pPool) return;
H
refact  
Hongze Cheng 已提交
236

C
Cary Xu 已提交
237 238 239 240 241 242 243 244 245 246 247
  SPoolMem *pMem;

  do {
    pMem = pPool->next;

    if (pMem == pPool) break;

    pMem->next->prev = pMem->prev;
    pMem->prev->next = pMem->next;
    pPool->size -= pMem->size;

H
Hongze Cheng 已提交
248
    taosMemoryFree(pMem);
C
Cary Xu 已提交
249 250 251 252 253 254 255 256
  } while (1);

  assert(pPool->size == 0);
}

static void closePool(SPoolMem *pPool) {
  if (pPool) {
    clearPool(pPool);
H
Hongze Cheng 已提交
257
    taosMemoryFree(pPool);
C
Cary Xu 已提交
258 259 260 261 262 263 264 265
  }
}

static void *poolMalloc(void *arg, size_t size) {
  void     *ptr = NULL;
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

H
Hongze Cheng 已提交
266
  pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
C
Cary Xu 已提交
267
  if (!pMem) {
C
Cary Xu 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
    assert(0);
  }

  pMem->size = sizeof(*pMem) + size;
  pMem->next = pPool->next;
  pMem->prev = pPool;

  pPool->next->prev = pMem;
  pPool->next = pMem;
  pPool->size += pMem->size;

  ptr = (void *)(&pMem[1]);
  return ptr;
}

static void poolFree(void *arg, void *ptr) {
  SPoolMem *pPool = (SPoolMem *)arg;
  SPoolMem *pMem;

  pMem = &(((SPoolMem *)ptr)[-1]);

  pMem->next->prev = pMem->prev;
  pMem->prev->next = pMem->next;
  pPool->size -= pMem->size;

H
Hongze Cheng 已提交
293
  taosMemoryFree(pMem);
C
Cary Xu 已提交
294
}
C
Cary Xu 已提交
295 296 297

int32_t tsdbInitSma(STsdb *pTsdb) {
  // tSma
H
Hongze Cheng 已提交
298
  int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(REPO_META(pTsdb), false));
C
Cary Xu 已提交
299 300 301 302 303 304 305
  if (numOfTSma > 0) {
    atomic_store_16(&REPO_TSMA_NUM(pTsdb), (int16_t)numOfTSma);
  }
  // TODO: rSma
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
306 307 308 309 310 311 312 313
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    return atomic_load_8(&pStatItem->state);
  }
  return TSDB_SMA_STAT_UNKNOWN;
}

static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
C
Cary Xu 已提交
314
  if (!pStatItem) {
C
Cary Xu 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
    return false;
  }

  if (state) {
    *state = atomic_load_8(&pStatItem->state);
    return *state == TSDB_SMA_STAT_OK;
  }
  return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}

static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}

static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}

static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
  }
}

static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
  }
}

static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
  }
}

C
Cary Xu 已提交
351
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
C
Cary Xu 已提交
352
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]);
C
Cary Xu 已提交
353
}
C
Cary Xu 已提交
354

C
Cary Xu 已提交
355
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did) {
C
Cary Xu 已提交
356 357
  SSmaEnv *pEnv = NULL;

wafwerar's avatar
wafwerar 已提交
358
  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
C
Cary Xu 已提交
359
  if (!pEnv) {
C
Cary Xu 已提交
360 361 362 363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

C
Cary Xu 已提交
364 365
  SMA_ENV_TYPE(pEnv) = smaType;

wafwerar's avatar
wafwerar 已提交
366
  int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
C
Cary Xu 已提交
367 368
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
wafwerar's avatar
wafwerar 已提交
369
    taosMemoryFree(pEnv);
C
Cary Xu 已提交
370 371 372 373
    return NULL;
  }

  ASSERT(path && (strlen(path) > 0));
C
Cary Xu 已提交
374
  SMA_ENV_PATH(pEnv) = strdup(path);
C
Cary Xu 已提交
375
  if (!SMA_ENV_PATH(pEnv)) {
C
Cary Xu 已提交
376 377 378 379
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
380
  SMA_ENV_DID(pEnv) = did;
381

C
Cary Xu 已提交
382
  if (tsdbInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
383 384 385 386
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

387
  char aname[TSDB_FILENAME_LEN] = {0};
H
Hongze Cheng 已提交
388
  tfsAbsoluteName(REPO_TFS(pTsdb), did, path, aname);
C
Cary Xu 已提交
389 390 391 392 393
  if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
394
  if (!(pEnv->pPool = openPool())) {
395 396 397 398
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
399 400 401
  return pEnv;
}

C
Cary Xu 已提交
402
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) {
C
Cary Xu 已提交
403 404 405 406 407
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
408 409
  if (!(*pEnv)) {
    if (!(*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did))) {
C
Cary Xu 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
C
Cary Xu 已提交
425
    tsdbDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv));
wafwerar's avatar
wafwerar 已提交
426 427
    taosMemoryFreeClear(pSmaEnv->pStat);
    taosMemoryFreeClear(pSmaEnv->path);
wafwerar's avatar
wafwerar 已提交
428
    taosThreadRwlockDestroy(&(pSmaEnv->lock));
C
Cary Xu 已提交
429 430
    tsdbCloseDBEnv(pSmaEnv->dbEnv);
    closePool(pSmaEnv->pPool);
C
Cary Xu 已提交
431 432 433 434 435
  }
}

void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
  tsdbDestroySmaEnv(pSmaEnv);
wafwerar's avatar
wafwerar 已提交
436
  taosMemoryFreeClear(pSmaEnv);
C
Cary Xu 已提交
437 438 439
  return NULL;
}

440
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
C
Cary Xu 已提交
441
  if (!pStat) return 0;
C
Cary Xu 已提交
442

443
  int ref = T_REF_INC(pStat);
C
Cary Xu 已提交
444
  tsdbDebug("vgId:%d ref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref);
445 446 447 448
  return 0;
}

static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
C
Cary Xu 已提交
449
  if (!pStat) return 0;
450 451

  int ref = T_REF_DEC(pStat);
C
Cary Xu 已提交
452
  tsdbDebug("vgId:%d unref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref);
453 454 455
  return 0;
}

C
Cary Xu 已提交
456
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
457
  ASSERT(pSmaStat != NULL);
C
Cary Xu 已提交
458

C
Cary Xu 已提交
459
  if (*pSmaStat) {  // no lock
C
Cary Xu 已提交
460 461 462
    return TSDB_CODE_SUCCESS;
  }

463 464 465 466 467
  /**
   *  1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tsdbNew).
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tsdbInitSmaStat invoked in other multithread environment later.
   */
C
Cary Xu 已提交
468
  if (!(*pSmaStat)) {
wafwerar's avatar
wafwerar 已提交
469
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
C
Cary Xu 已提交
470
    if (!(*pSmaStat)) {
471 472 473 474
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
475
    if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
476
      SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
C
Cary Xu 已提交
477
          RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
478

C
Cary Xu 已提交
479
      if (!SMA_STAT_INFO_HASH(*pSmaStat)) {
C
Cary Xu 已提交
480 481 482 483
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
484
      SMA_STAT_ITEMS(*pSmaStat) =
C
Cary Xu 已提交
485 486
          taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

C
Cary Xu 已提交
487
      if (!SMA_STAT_ITEMS(*pSmaStat)) {
C
Cary Xu 已提交
488 489 490 491 492
        taosMemoryFreeClear(*pSmaStat);
        return TSDB_CODE_FAILED;
      }
    } else {
      ASSERT(0);
493 494 495 496 497 498 499 500
    }
  }
  return TSDB_CODE_SUCCESS;
}

static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
  SSmaStatItem *pItem = NULL;

wafwerar's avatar
wafwerar 已提交
501
  pItem = (SSmaStatItem *)taosMemoryCalloc(1, sizeof(SSmaStatItem));
502 503 504 505 506
  if (pItem) {
    pItem->state = state;
    pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
                                         true, HASH_ENTRY_LOCK);
    if (!pItem->expiredWindows) {
wafwerar's avatar
wafwerar 已提交
507
      taosMemoryFreeClear(pItem);
508 509 510 511 512
    }
  }
  return pItem;
}

C
Cary Xu 已提交
513
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
C
Cary Xu 已提交
514
  if (pSmaStatItem) {
C
Cary Xu 已提交
515
    tdDestroyTSma(pSmaStatItem->pSma);
wafwerar's avatar
wafwerar 已提交
516
    taosMemoryFreeClear(pSmaStatItem->pSma);
C
Cary Xu 已提交
517
    taosHashCleanup(pSmaStatItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
518
    taosMemoryFreeClear(pSmaStatItem);
C
Cary Xu 已提交
519 520 521 522
  }
  return NULL;
}

C
Cary Xu 已提交
523 524
/**
 * @brief Release resources allocated for its member fields, not including itself.
525 526 527
 *
 * @param pSmaStat
 * @return int32_t
C
Cary Xu 已提交
528
 */
C
Cary Xu 已提交
529
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
530 531
  if (pSmaStat) {
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
C
Cary Xu 已提交
532
    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
C
Cary Xu 已提交
533
      void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL);
C
Cary Xu 已提交
534
      while (item) {
C
Cary Xu 已提交
535 536
        SSmaStatItem *pItem = *(SSmaStatItem **)item;
        tsdbFreeSmaStatItem(pItem);
C
Cary Xu 已提交
537
        item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item);
C
Cary Xu 已提交
538
      }
C
Cary Xu 已提交
539
      taosHashCleanup(SMA_STAT_ITEMS(pSmaStat));
C
Cary Xu 已提交
540
    } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
C
Cary Xu 已提交
541
      void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL);
C
Cary Xu 已提交
542
      while (infoHash) {
C
Cary Xu 已提交
543 544
        SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
        tsdbFreeRSmaInfo(pInfoHash);
C
Cary Xu 已提交
545
        infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash);
C
Cary Xu 已提交
546
      }
C
Cary Xu 已提交
547
      taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat));
C
Cary Xu 已提交
548 549
    } else {
      ASSERT(0);
550 551
    }
  }
552
  return TSDB_CODE_SUCCESS;
553 554
}

C
Cary Xu 已提交
555
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
556 557
  SSmaEnv *pEnv = NULL;

C
Cary Xu 已提交
558
  // return if already init
C
Cary Xu 已提交
559 560
  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
C
Cary Xu 已提交
561
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)))) {
C
Cary Xu 已提交
562 563 564 565
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
C
Cary Xu 已提交
566
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)))) {
C
Cary Xu 已提交
567 568 569 570 571 572 573 574
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
575 576
  // init sma env
  tsdbLockRepo(pTsdb);
C
Cary Xu 已提交
577 578
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&REPO_TSMA_ENV(pTsdb))
                                               : atomic_load_ptr(&REPO_RSMA_ENV(pTsdb));
C
Cary Xu 已提交
579
  if (!pEnv) {
C
Cary Xu 已提交
580
    char rname[TSDB_FILENAME_LEN] = {0};
C
Cary Xu 已提交
581

C
Cary Xu 已提交
582
    SDiskID did = {0};
H
Hongze Cheng 已提交
583
    tfsAllocDisk(REPO_TFS(pTsdb), TFS_PRIMARY_LEVEL, &did);
C
Cary Xu 已提交
584 585 586 587 588 589
    if (did.level < 0 || did.id < 0) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }
    tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);

H
Hongze Cheng 已提交
590
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rname, did) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
591 592 593 594
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
595
    if (tsdbInitSmaEnv(pTsdb, smaType, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
596 597 598 599
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
600 601
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&REPO_TSMA_ENV(pTsdb), pEnv)
                                          : atomic_store_ptr(&REPO_RSMA_ENV(pTsdb), pEnv);
C
Cary Xu 已提交
602
  }
C
Cary Xu 已提交
603
  tsdbUnlockRepo(pTsdb);
C
Cary Xu 已提交
604 605 606 607

  return TSDB_CODE_SUCCESS;
};

H
refact  
Hongze Cheng 已提交
608 609
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
                                    int64_t version) {
610
  SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
611
  if (!pItem) {
C
Cary Xu 已提交
612 613
    // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
    pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK);  // TODO use the real state
C
Cary Xu 已提交
614
    if (!pItem) {
615
      // Response to stream computing: OOM
616
      // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
617 618 619
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
620
    // cache smaMeta
H
Hongze Cheng 已提交
621
    STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true);
C
Cary Xu 已提交
622
    if (!pSma) {
623
      terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
C
Cary Xu 已提交
624
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
625
      taosMemoryFree(pItem);
626 627
      tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
               tstrerror(terrno));
C
Cary Xu 已提交
628 629 630 631
      return TSDB_CODE_FAILED;
    }
    pItem->pSma = pSma;

C
Cary Xu 已提交
632
    if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
633 634
      // If error occurs during put smaStatItem, free the resources of pItem
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
635
      taosMemoryFree(pItem);
636 637
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
638
  } else if (!(pItem = *(SSmaStatItem **)pItem)) {
C
Cary Xu 已提交
639 640
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
641 642
  }

643
  if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) {
C
Cary Xu 已提交
644 645 646 647 648 649 650
    // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
    // tell query module to query raw TS data.
    // N.B.
    //  1) It is assumed to be extemely little probability event of fail to taosHashPut.
    //  2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
    // windows failed to put into hash table.
    taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
651
    taosMemoryFreeClear(pItem->pSma);
C
Cary Xu 已提交
652
    taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
653 654
    tsdbWarn("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", REPO_ID(pTsdb), indexUid,
             winSKey);
C
Cary Xu 已提交
655 656
    return TSDB_CODE_FAILED;
  }
wmmhello's avatar
wmmhello 已提交
657

C
Cary Xu 已提交
658 659
  tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", REPO_ID(pTsdb), indexUid,
            winSKey);
660
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
661 662 663 664 665 666 667 668 669
}

/**
 * @brief Update expired window according to msg from stream computing module.
 *
 * @param pTsdb
 * @param msg SSubmitReq
 * @return int32_t
 */
670 671
int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
  // no time-range-sma, just return success
C
Cary Xu 已提交
672 673 674 675 676
  if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) {
    tsdbTrace("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb));
    return TSDB_CODE_SUCCESS;
  }

H
Hongze Cheng 已提交
677
  if (!REPO_META(pTsdb)) {
C
Cary Xu 已提交
678 679 680 681 682 683 684 685 686 687 688 689
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

  // Firstly, assume that tSma can only be created on super table/normal table.
  // getActiveTimeWindow

C
Cary Xu 已提交
690
  SSmaEnv  *pEnv = REPO_TSMA_ENV(pTsdb);
C
Cary Xu 已提交
691 692 693
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);

C
Cary Xu 已提交
694
  TASSERT(pEnv && pStat && pItemsHash);
C
Cary Xu 已提交
695

C
Cary Xu 已提交
696 697 698
  // basic procedure
  // TODO: optimization
  tsdbRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
699 700 701 702

  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SInterval      interval = {0};
C
Cary Xu 已提交
703
  TSKEY          lastWinSKey = INT64_MIN;
C
Cary Xu 已提交
704

C
Cary Xu 已提交
705
  if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
706 707 708 709
    return TSDB_CODE_FAILED;
  }

  while (true) {
C
Cary Xu 已提交
710
    tGetSubmitMsgNextEx(&msgIter, &pBlock);
C
Cary Xu 已提交
711
    if (!pBlock) break;
C
Cary Xu 已提交
712 713 714 715

    STSmaWrapper *pSW = NULL;
    STSma        *pTSma = NULL;

C
Cary Xu 已提交
716
    SSubmitBlkIter blkIter = {0};
C
Cary Xu 已提交
717
    if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
718
      pSW = tdFreeTSmaWrapper(pSW);
C
Cary Xu 已提交
719 720 721
      break;
    }

C
Cary Xu 已提交
722
    while (true) {
C
Cary Xu 已提交
723
      STSRow *row = tGetSubmitBlkNextEx(&blkIter);
C
Cary Xu 已提交
724
      if (!row) {
C
Cary Xu 已提交
725 726 727
        tdFreeTSmaWrapper(pSW);
        break;
      }
C
Cary Xu 已提交
728 729 730 731
      if (!pSW || (pTSma->tableUid != pBlock->suid)) {
        if (pSW) {
          pSW = tdFreeTSmaWrapper(pSW);
        }
C
Cary Xu 已提交
732
        if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) {
C
Cary Xu 已提交
733 734
          break;
        }
C
Cary Xu 已提交
735
        if ((pSW->number) <= 0 || !pSW->tSma) {
C
Cary Xu 已提交
736
          pSW = tdFreeTSmaWrapper(pSW);
C
Cary Xu 已提交
737 738
          break;
        }
C
Cary Xu 已提交
739

C
Cary Xu 已提交
740 741
        pTSma = pSW->tSma;

C
Cary Xu 已提交
742 743 744 745 746 747 748
        interval.interval = pTSma->interval;
        interval.intervalUnit = pTSma->intervalUnit;
        interval.offset = pTSma->offset;
        interval.precision = REPO_CFG(pTsdb)->precision;
        interval.sliding = pTSma->sliding;
        interval.slidingUnit = pTSma->slidingUnit;
      }
C
Cary Xu 已提交
749

C
update  
Cary Xu 已提交
750 751
      TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);

C
Cary Xu 已提交
752 753 754 755 756 757 758
      if (lastWinSKey != winSKey) {
        lastWinSKey = winSKey;
        tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version);
      } else {
        tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
                  REPO_ID(pTsdb), pTSma->indexUid, winSKey);
      }
759 760 761
    }
  }

762
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
763

764 765 766
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
767 768 769 770 771 772 773 774 775
/**
 * @brief When sma data received from stream computing, make the relative expired window valid.
 *
 * @param pTsdb
 * @param pStat
 * @param indexUid
 * @param skey
 * @return int32_t
 */
776
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
C
Cary Xu 已提交
777 778
  SSmaStatItem *pItem = NULL;

779 780
  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
781 782
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
783
  }
C
Cary Xu 已提交
784
  if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) {
785 786
    // pItem resides in hash buffer all the time unless drop sma index
    // TODO: multithread protect
C
Cary Xu 已提交
787 788
    if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
      // error handling
789
      tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
790
      tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", REPO_ID(pTsdb),
791 792
               skey, indexUid);
      return TSDB_CODE_FAILED;
C
Cary Xu 已提交
793
    }
C
Cary Xu 已提交
794 795
    tsdbDebug("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", REPO_ID(pTsdb),
              skey, indexUid);
C
Cary Xu 已提交
796 797 798 799 800 801 802 803 804
    // TODO: use a standalone interface to received state upate notification from stream computing module.
    /**
     * @brief state
     *  - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
     *  - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
     * when batch data caculation not finised)
     *  - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
     */
    pItem->state = TSDB_SMA_STAT_OK;
C
Cary Xu 已提交
805 806
  } else {
    // error handling
807 808 809
    tsdbUnRefSmaStat(pTsdb, pStat);
    tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid);
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
810
  }
811 812

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
813 814 815
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
816 817 818 819 820 821 822
/**
 * @brief Judge the tSma storage level
 *
 * @param interval
 * @param intervalUnit
 * @return int32_t
 */
C
Cary Xu 已提交
823
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
824 825
  // TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
  switch (intervalUnit) {
C
Cary Xu 已提交
826
    case TIME_UNIT_HOUR:
C
Cary Xu 已提交
827 828 829 830
      if (interval < SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
831
    case TIME_UNIT_MINUTE:
C
Cary Xu 已提交
832 833 834 835
      if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
836
    case TIME_UNIT_SECOND:
C
Cary Xu 已提交
837 838 839 840
      if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
841
    case TIME_UNIT_MILLISECOND:
C
Cary Xu 已提交
842 843 844 845
      if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
846
    case TIME_UNIT_MICROSECOND:
C
Cary Xu 已提交
847 848 849 850
      if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
851
    case TIME_UNIT_NANOSECOND:
C
Cary Xu 已提交
852 853 854 855 856 857 858 859 860 861 862
      if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
    default:
      break;
  }
  return SMA_STORAGE_LEVEL_TSDB;
}

/**
C
Cary Xu 已提交
863
 * @brief Insert TSma data blocks to DB File build by B+Tree
C
Cary Xu 已提交
864
 *
C
Cary Xu 已提交
865
 * @param pSmaH
866
 * @param smaKey  tableUid-colId-skeyOfWindow(8-2-8)
C
Cary Xu 已提交
867
 * @param keyLen
C
Cary Xu 已提交
868 869 870 871
 * @param pData
 * @param dataLen
 * @return int32_t
 */
C
Cary Xu 已提交
872 873
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
                                    TXN *txn) {
C
Cary Xu 已提交
874
  SDBFile *pDBFile = &pSmaH->dFile;
C
Cary Xu 已提交
875

876
  // TODO: insert sma data blocks into B+Tree(TDB)
C
Cary Xu 已提交
877
  if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
C
Cary Xu 已提交
878 879
    tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
             REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
880 881
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
882 883
  tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
            REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
884

C
Cary Xu 已提交
885
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
886
  uint32_t valueSize = 0;
C
Cary Xu 已提交
887
  void    *data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
C
Cary Xu 已提交
888 889
  ASSERT(data != NULL);
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
890
    tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
C
Cary Xu 已提交
891 892
  }
#endif
C
Cary Xu 已提交
893 894 895
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
896 897 898 899 900 901
/**
 * @brief Approximate value for week/month/year.
 *
 * @param interval
 * @param intervalUnit
 * @param precision
C
Cary Xu 已提交
902
 * @param adjusted Interval already adjusted according to DB precision
C
Cary Xu 已提交
903 904
 * @return int64_t
 */
C
Cary Xu 已提交
905 906 907 908 909
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted) {
  if (adjusted) {
    return interval;
  }

C
Cary Xu 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
  switch (intervalUnit) {
    case TIME_UNIT_YEAR:  // approximate value
      interval *= 365 * 86400 * 1e3;
      break;
    case TIME_UNIT_MONTH:  // approximate value
      interval *= 30 * 86400 * 1e3;
      break;
    case TIME_UNIT_WEEK:  // approximate value
      interval *= 7 * 86400 * 1e3;
      break;
    case TIME_UNIT_DAY:  // the interval for tSma calculation must <= day
      interval *= 86400 * 1e3;
      break;
    case TIME_UNIT_HOUR:
      interval *= 3600 * 1e3;
      break;
    case TIME_UNIT_MINUTE:
      interval *= 60 * 1e3;
      break;
    case TIME_UNIT_SECOND:
      interval *= 1e3;
      break;
    default:
      break;
C
Cary Xu 已提交
934 935
  }

C
Cary Xu 已提交
936 937
  switch (precision) {
    case TSDB_TIME_PRECISION_MILLI:
C
Cary Xu 已提交
938
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
939
        return interval / 1e3;
C
Cary Xu 已提交
940
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
941
        return interval / 1e6;
942
      } else {  // ms
C
Cary Xu 已提交
943 944 945
        return interval;
      }
      break;
C
Cary Xu 已提交
946
    case TSDB_TIME_PRECISION_MICRO:
C
Cary Xu 已提交
947
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
948
        return interval;
949
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
950
        return interval / 1e3;
951
      } else {  // ms
C
Cary Xu 已提交
952 953 954
        return interval * 1e3;
      }
      break;
C
Cary Xu 已提交
955
    case TSDB_TIME_PRECISION_NANO:
956
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
957
        return interval * 1e3;
958
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  // ns
C
Cary Xu 已提交
959
        return interval;
960
      } else {  // ms
C
Cary Xu 已提交
961
        return interval * 1e6;
C
Cary Xu 已提交
962 963
      }
      break;
C
Cary Xu 已提交
964
    default:                                        // ms
C
Cary Xu 已提交
965
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
966
        return interval / 1e3;
967
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
968
        return interval / 1e6;
969
      } else {  // ms
C
Cary Xu 已提交
970
        return interval;
C
Cary Xu 已提交
971 972 973 974 975 976
      }
      break;
  }
  return interval;
}

C
Cary Xu 已提交
977 978
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval,
                                  int8_t intervalUnit) {
C
Cary Xu 已提交
979
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
980 981 982
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
  pSmaH->pDataBlocks = pDataBlocks;
  pSmaH->dFile.fid = TSDB_IVLD_FID;
983 984 985 986 987
  return TSDB_CODE_SUCCESS;
}

static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
  if (pSmaH) {
C
Cary Xu 已提交
988
    tsdbCloseDBF(&pSmaH->dFile);
989
  }
C
Cary Xu 已提交
990 991
}

C
Cary Xu 已提交
992
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) {
C
Cary Xu 已提交
993
  STsdb *pTsdb = pSmaH->pTsdb;
C
Cary Xu 已提交
994
  ASSERT(!pSmaH->dFile.path && !pSmaH->dFile.pDB);
995 996

  pSmaH->dFile.fid = fid;
997
  char tSmaFile[TSDB_FILENAME_LEN] = {0};
998
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
999
  pSmaH->dFile.path = strdup(tSmaFile);
1000

C
Cary Xu 已提交
1001
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1002
}
C
Cary Xu 已提交
1003

C
Cary Xu 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
/**
 * @brief
 *
 * @param pTsdb
 * @param interval Interval calculated by DB's precision
 * @param storageLevel
 * @return int32_t
 */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
  STsdbCfg *pCfg = REPO_CFG(pTsdb);
H
refact  
Hongze Cheng 已提交
1014
  int32_t   daysPerFile = pCfg->days;
C
Cary Xu 已提交
1015 1016

  if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
C
Cary Xu 已提交
1017
    int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
C
Cary Xu 已提交
1018 1019 1020 1021 1022
    daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
  }

  return daysPerFile;
}
C
Cary Xu 已提交
1023

C
Cary Xu 已提交
1024 1025 1026 1027 1028
static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
  TXN *pTxn = &pEnv->txn;
  // start a new txn
  tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
  if (tdbBegin(pEnv->dbEnv, pTxn) != 0) {
1029
    tsdbWarn("tsdbSma tdb begin commit fail");
C
Cary Xu 已提交
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    return -1;
  }
  return 0;
}

static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
  TXN *pTxn = &pEnv->txn;

  // Commit current txn
  if (tdbCommit(pEnv->dbEnv, pTxn) != 0) {
1040
    tsdbWarn("tsdbSma tdb end commit fail");
C
Cary Xu 已提交
1041 1042 1043 1044 1045 1046 1047
    return -1;
  }
  tdbTxnClose(pTxn);
  clearPool(pEnv->pPool);
  return 0;
}

C
Cary Xu 已提交
1048 1049 1050 1051 1052 1053 1054 1055 1056
/**
 * @brief Insert/Update Time-range-wise SMA data.
 *  - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
 * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
 *  - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
 * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
 *  - The destination file of one data block for some interval is determined by its start TS key.
 *
 * @param pTsdb
C
Cary Xu 已提交
1057
 * @param msg
C
Cary Xu 已提交
1058 1059
 * @return int32_t
 */
C
Cary Xu 已提交
1060 1061 1062
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) {
  STsdbCfg     *pCfg = REPO_CFG(pTsdb);
  const SArray *pDataBlocks = (const SArray *)msg;
C
Cary Xu 已提交
1063

C
Cary Xu 已提交
1064 1065
  // TODO: destroy SSDataBlocks(msg)

1066 1067 1068 1069 1070
  // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
  // the sma data would arrive ahead of the update-expired-window msg.
  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
1071
  }
C
Cary Xu 已提交
1072

C
Cary Xu 已提交
1073
  if (!pDataBlocks) {
C
Cary Xu 已提交
1074
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1075
    tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb));
C
Cary Xu 已提交
1076
    return terrno;
C
Cary Xu 已提交
1077 1078
  }

C
Cary Xu 已提交
1079
  if (taosArrayGetSize(pDataBlocks) <= 0) {
C
Cary Xu 已提交
1080
    terrno = TSDB_CODE_INVALID_PARA;
C
Cary Xu 已提交
1081
    tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", REPO_ID(pTsdb));
1082
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1083
  }
C
Cary Xu 已提交
1084

1085
  SSmaEnv      *pEnv = REPO_TSMA_ENV(pTsdb);
C
Cary Xu 已提交
1086
  SSmaStat     *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
1087 1088 1089 1090
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
1091 1092
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1093
  }
C
Cary Xu 已提交
1094

C
Cary Xu 已提交
1095
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) {
C
Cary Xu 已提交
1096 1097 1098 1099 1100
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1101
  STSma      *pSma = pItem->pSma;
C
Cary Xu 已提交
1102 1103
  STSmaWriteH tSmaH = {0};

C
Cary Xu 已提交
1104
  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) {
C
Cary Xu 已提交
1105 1106 1107
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1108 1109
  char rPath[TSDB_FILENAME_LEN] = {0};
  char aPath[TSDB_FILENAME_LEN] = {0};
1110 1111 1112 1113
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1114
      tsdbUnRefSmaStat(pTsdb, pStat);
1115 1116 1117 1118
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
1119
  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
1120
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
1121
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
1122

C
Cary Xu 已提交
1123 1124
  char    smaKey[SMA_KEY_LEN] = {0};  // key: skey + groupId
  char    dataBuf[512] = {0};         // val: aggr data // TODO: handle 512 buffer?
C
Cary Xu 已提交
1125
  void   *pDataBuf = NULL;
C
Cary Xu 已提交
1126 1127
  int32_t sz = taosArrayGetSize(pDataBlocks);
  for (int32_t i = 0; i < sz; ++i) {
C
Cary Xu 已提交
1128
    SSDataBlock *pDataBlock = taosArrayGet(pDataBlocks, i);
C
Cary Xu 已提交
1129 1130 1131 1132 1133 1134
    int32_t      colNum = pDataBlock->info.numOfCols;
    int32_t      rows = pDataBlock->info.rows;
    int32_t      rowSize = pDataBlock->info.rowSize;
    int64_t      groupId = pDataBlock->info.groupId;
    for (int32_t j = 0; j < rows; ++j) {
      printf("|");
C
Cary Xu 已提交
1135
      TSKEY skey = TSKEY_INITIAL_VAL;  //  the start key of TS window by interval
C
Cary Xu 已提交
1136 1137
      void *pSmaKey = &smaKey;
      bool  isStartKey = false;
C
Cary Xu 已提交
1138

C
Cary Xu 已提交
1139 1140
      int32_t tlen = 0;     // reset the len
      pDataBuf = &dataBuf;  // reset the buf
C
Cary Xu 已提交
1141
      for (int32_t k = 0; k < colNum; ++k) {
C
Cary Xu 已提交
1142
        SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
C
Cary Xu 已提交
1143 1144 1145
        void            *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
C
Cary Xu 已提交
1146 1147 1148
            if (!isStartKey) {
              isStartKey = true;
              skey = *(TSKEY *)var;
C
Cary Xu 已提交
1149
              printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
C
Cary Xu 已提交
1150 1151 1152 1153 1154 1155
              tsdbEncodeTSmaKey(groupId, skey, &pSmaKey);
            } else {
              printf(" %" PRIi64 " |", *(int64_t *)var);
              tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
              break;
            }
C
Cary Xu 已提交
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_UTINYINT:
            printf(" %15d |", *(uint8_t *)var);
            tlen += taosEncodeFixedU8(&pDataBuf, *(uint8_t *)var);
            break;
          case TSDB_DATA_TYPE_TINYINT:
            printf(" %15d |", *(int8_t *)var);
            tlen += taosEncodeFixedI8(&pDataBuf, *(int8_t *)var);
            break;
          case TSDB_DATA_TYPE_SMALLINT:
            printf(" %15d |", *(int16_t *)var);
            tlen += taosEncodeFixedI16(&pDataBuf, *(int16_t *)var);
            break;
          case TSDB_DATA_TYPE_USMALLINT:
            printf(" %15d |", *(uint16_t *)var);
            tlen += taosEncodeFixedU16(&pDataBuf, *(uint16_t *)var);
            break;
          case TSDB_DATA_TYPE_INT:
            printf(" %15d |", *(int32_t *)var);
            tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var);
            break;
          case TSDB_DATA_TYPE_FLOAT:
C
Cary Xu 已提交
1179 1180 1181
            printf(" %15f |", *(float *)var);
            tlen += taosEncodeBinary(&pDataBuf, var, sizeof(float));
            break;
C
Cary Xu 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t *)var);
            tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var);
            break;
          case TSDB_DATA_TYPE_BIGINT:
            printf(" %15ld |", *(int64_t *)var);
            tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var);
            break;
          case TSDB_DATA_TYPE_DOUBLE:
C
Cary Xu 已提交
1191 1192
            printf(" %15lf |", *(double *)var);
            tlen += taosEncodeBinary(&pDataBuf, var, sizeof(double));
C
Cary Xu 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
          case TSDB_DATA_TYPE_UBIGINT:
            printf(" %15lu |", *(uint64_t *)var);
            tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var);
            break;
          case TSDB_DATA_TYPE_NCHAR: {
            char tmpChar[100] = {0};
            strncpy(tmpChar, varDataVal(var), varDataLen(var));
            printf(" %s |", tmpChar);
            tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
            break;
          }
C
Cary Xu 已提交
1204 1205
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
            char tmpChar[100] = {0};
C
Cary Xu 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219
            strncpy(tmpChar, varDataVal(var), varDataLen(var));
            printf(" %s |", tmpChar);
            tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var));
            break;
          }
          case TSDB_DATA_TYPE_VARBINARY:
            // TODO: add binary/varbinary
            TASSERT(0);
          default:
            printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
            TASSERT(0);
            break;
        }
      }
C
Cary Xu 已提交
1220 1221
      // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
      if (tlen > 0) {
C
Cary Xu 已提交
1222 1223 1224 1225 1226 1227 1228 1229
        int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision));

        // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
        // file
        //         - Set and open the DFile or the B+Tree file
        // TODO: tsdbStartTSmaCommit();
        if (fid != tSmaH.dFile.fid) {
          if (tSmaH.dFile.fid != TSDB_IVLD_FID) {
C
Cary Xu 已提交
1230
            tsdbSmaEndCommit(pEnv);
C
Cary Xu 已提交
1231 1232 1233
            tsdbCloseDBF(&tSmaH.dFile);
          }
          tsdbSetTSmaDataFile(&tSmaH, indexUid, fid);
C
Cary Xu 已提交
1234
          if (tsdbOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
C
Cary Xu 已提交
1235 1236 1237 1238 1239 1240
            tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
                     tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
            tsdbDestroyTSmaWriteH(&tSmaH);
            tsdbUnRefSmaStat(pTsdb, pStat);
            return TSDB_CODE_FAILED;
          }
C
Cary Xu 已提交
1241
          tsdbSmaBeginCommit(pEnv);
C
Cary Xu 已提交
1242
        }
C
Cary Xu 已提交
1243

C
Cary Xu 已提交
1244
        if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
C
Cary Xu 已提交
1245
          tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
C
Cary Xu 已提交
1246 1247
                   " since %s",
                   REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
C
Cary Xu 已提交
1248
          tsdbSmaEndCommit(pEnv);
C
Cary Xu 已提交
1249 1250 1251 1252
          tsdbDestroyTSmaWriteH(&tSmaH);
          tsdbUnRefSmaStat(pTsdb, pStat);
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1253 1254
        tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
                  REPO_ID(pTsdb), indexUid, skey, groupId);
C
Cary Xu 已提交
1255
        // TODO:tsdbEndTSmaCommit();
C
Cary Xu 已提交
1256

C
Cary Xu 已提交
1257
        // Step 3: reset the SSmaStat
C
Cary Xu 已提交
1258
        tsdbResetExpiredWindow(pTsdb, pStat, indexUid, skey);
C
Cary Xu 已提交
1259 1260 1261 1262
      } else {
        tsdbWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64,
                 REPO_ID(pTsdb), skey, tlen, indexUid);
      }
C
Cary Xu 已提交
1263

C
Cary Xu 已提交
1264 1265
      printf("\n");
    }
1266
  }
C
Cary Xu 已提交
1267
  tsdbSmaEndCommit(pEnv);  // TODO: not commit for every insert
1268
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
1269
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1270

C
Cary Xu 已提交
1271 1272 1273
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1274 1275 1276
/**
 * @brief Drop tSma data and local cache
 *        - insert/query reference
C
Cary Xu 已提交
1277 1278 1279
 * @param pTsdb
 * @param msg
 * @return int32_t
C
Cary Xu 已提交
1280 1281
 */
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
C
Cary Xu 已提交
1282
  SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb));
C
Cary Xu 已提交
1283 1284 1285 1286 1287 1288

  // clear local cache
  if (pEnv) {
    tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);

    SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1289
    if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) {
C
Cary Xu 已提交
1290 1291
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1292
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
1293 1294 1295 1296 1297 1298
      }

      tsdbWLockSma(pEnv);
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbUnLockSma(pEnv);
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1299
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
1300 1301 1302 1303 1304
      }
      tsdbSmaStatSetDropped(pItem);
      tsdbUnLockSma(pEnv);

      int32_t nSleep = 0;
C
Cary Xu 已提交
1305
      int32_t refVal = INT32_MAX;
C
Cary Xu 已提交
1306
      while (true) {
C
Cary Xu 已提交
1307 1308
        if ((refVal = T_REF_VAL_GET(SMA_ENV_STAT(pEnv))) <= 0) {
          tsdbDebug("vgId:%d drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal);
C
Cary Xu 已提交
1309 1310
          break;
        }
C
Cary Xu 已提交
1311
        tsdbDebug("vgId:%d wait 1s to drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal);
C
Cary Xu 已提交
1312 1313
        taosSsleep(1);
        if (++nSleep > SMA_DROP_EXPIRED_TIME) {
C
Cary Xu 已提交
1314 1315
          tsdbDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", REPO_ID(pTsdb), indexUid, nSleep,
                    refVal);
C
Cary Xu 已提交
1316 1317 1318 1319 1320 1321 1322 1323 1324
          break;
        };
      }

      tsdbFreeSmaStatItem(pItem);
      tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
    }
  }
  // clear sma data files
C
Cary Xu 已提交
1325
  // TODO:
1326
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1327 1328
}

C
Cary Xu 已提交
1329
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) {
1330 1331 1332 1333
  STsdb *pTsdb = pSmaH->pTsdb;

  char tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1334
  pSmaH->dFile.path = strdup(tSmaFile);
C
Cary Xu 已提交
1335 1336 1337 1338

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1339 1340 1341
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) {
  STsdbCfg     *pCfg = REPO_CFG(pTsdb);
  const SArray *pDataBlocks = (const SArray *)msg;
C
Cary Xu 已提交
1342
  SSmaEnv      *pEnv = atomic_load_ptr(&REPO_RSMA_ENV(pTsdb));
C
Cary Xu 已提交
1343
  int64_t       indexUid = SMA_TEST_INDEX_UID;
C
Cary Xu 已提交
1344

C
Cary Xu 已提交
1345
  if (!pEnv) {
C
Cary Xu 已提交
1346
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1347 1348 1349 1350
    tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return terrno;
  }

C
Cary Xu 已提交
1351
  if (!pDataBlocks) {
C
Cary Xu 已提交
1352
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1353
    tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb));
C
Cary Xu 已提交
1354 1355 1356
    return terrno;
  }

C
Cary Xu 已提交
1357
  if (taosArrayGetSize(pDataBlocks) <= 0) {
C
Cary Xu 已提交
1358
    terrno = TSDB_CODE_INVALID_PARA;
C
Cary Xu 已提交
1359
    tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is empty", REPO_ID(pTsdb));
C
Cary Xu 已提交
1360 1361 1362
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1363
  SSmaStat     *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
1364 1365 1366 1367
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
1368 1369
  if (pStat && SMA_STAT_ITEMS(pStat)) {
    pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1370 1371
  }

C
Cary Xu 已提交
1372
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) {
C
Cary Xu 已提交
1373 1374
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1375
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1376
  }
C
Cary Xu 已提交
1377

C
Cary Xu 已提交
1378 1379
  STSma *pSma = pItem->pSma;

C
Cary Xu 已提交
1380
  STSmaWriteH tSmaH = {0};
C
Cary Xu 已提交
1381

C
Cary Xu 已提交
1382
  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) {
C
Cary Xu 已提交
1383 1384
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1385

C
Cary Xu 已提交
1386 1387
  char rPath[TSDB_FILENAME_LEN] = {0};
  char aPath[TSDB_FILENAME_LEN] = {0};
C
Cary Xu 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_FAILED;
    }
  }

  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
1397
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
1398
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
1399
#if 0
C
Cary Xu 已提交
1400 1401
  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));

C
Cary Xu 已提交
1402 1403
  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file
C
Cary Xu 已提交
1404
  // TODO: tsdbStartTSmaCommit();
C
Cary Xu 已提交
1405 1406 1407 1408 1409 1410 1411
  tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
  if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
    tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
             tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
1412

C
Cary Xu 已提交
1413 1414 1415 1416 1417
  if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
    tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1418 1419
  // TODO:tsdbEndTSmaCommit();

C
Cary Xu 已提交
1420 1421
  // Step 3: reset the SSmaStat
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
1422
#endif
C
Cary Xu 已提交
1423

C
Cary Xu 已提交
1424
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
1425
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1426 1427 1428 1429
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
1430
 * @brief
C
Cary Xu 已提交
1431 1432 1433
 *
 * @param pSmaH
 * @param pTsdb
C
Cary Xu 已提交
1434 1435
 * @param interval
 * @param intervalUnit
C
Cary Xu 已提交
1436 1437
 * @return int32_t
 */
C
Cary Xu 已提交
1438
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
1439
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
1440
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
C
Cary Xu 已提交
1441 1442
  pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
  pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
1443
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1444 1445 1446 1447 1448 1449
}

/**
 * @brief Init of tSma FS
 *
 * @param pReadH
1450
 * @param indexUid
C
Cary Xu 已提交
1451
 * @param skey
C
Cary Xu 已提交
1452 1453
 * @return int32_t
 */
1454 1455 1456 1457
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
  STsdb *pTsdb = pSmaH->pTsdb;

  int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision));
C
Cary Xu 已提交
1458
  char    tSmaFile[TSDB_FILENAME_LEN] = {0};
1459
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1460 1461 1462
  pSmaH->dFile.path = strdup(tSmaFile);
  pSmaH->smaFsIter.iter = 0;
  pSmaH->smaFsIter.fid = fid;
1463
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
}

/**
 * @brief Set and open tSma file if it has key locates in queryWin.
 *
 * @param pReadH
 * @param param
 * @param queryWin
 * @return true
 * @return false
 */
C
Cary Xu 已提交
1475
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
C
Cary Xu 已提交
1476
  SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
C
Cary Xu 已提交
1477 1478
  int32_t nSmaFs = taosArrayGetSize(smaFs);

C
Cary Xu 已提交
1479
  tsdbCloseDBF(&pReadH->dFile);
C
Cary Xu 已提交
1480

C
Cary Xu 已提交
1481
#if 0
C
Cary Xu 已提交
1482 1483 1484 1485
  while (pReadH->smaFsIter.iter < nSmaFs) {
    void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
    if (pSmaFile) {  // match(indexName, queryWindow)
      // TODO: select the file by index_name ...
C
Cary Xu 已提交
1486
      pReadH->dFile = pSmaFile;
C
Cary Xu 已提交
1487 1488 1489 1490 1491 1492
      ++pReadH->smaFsIter.iter;
      break;
    }
    ++pReadH->smaFsIter.iter;
  }

C
Cary Xu 已提交
1493
  if (pReadH->pDFile) {
C
Cary Xu 已提交
1494 1495 1496
    tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
    return true;
  }
C
Cary Xu 已提交
1497
#endif
C
Cary Xu 已提交
1498 1499 1500 1501 1502

  return false;
}

/**
C
Cary Xu 已提交
1503
 * @brief
C
Cary Xu 已提交
1504
 *
C
Cary Xu 已提交
1505
 * @param pTsdb Return the data between queryWin and fill the pData.
C
Cary Xu 已提交
1506
 * @param pData
C
Cary Xu 已提交
1507 1508
 * @param indexUid
 * @param pQuerySKey
C
Cary Xu 已提交
1509 1510 1511
 * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
 * @return int32_t
 */
C
Cary Xu 已提交
1512
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
1513 1514
  SSmaEnv  *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb));
  SSmaStat *pStat = NULL;
1515 1516

  if (!pEnv) {
C
Cary Xu 已提交
1517 1518 1519 1520 1521
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1522 1523 1524
  pStat = SMA_ENV_STAT(pEnv);

  tsdbRefSmaStat(pTsdb, pStat);
1525
  SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
1526
  if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) {
C
Cary Xu 已提交
1527 1528
    // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
    // it's NULL.
C
Cary Xu 已提交
1529
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1530
    terrno = TSDB_CODE_TDB_INVALID_ACTION;
1531
    tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1532
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1533 1534
  }

C
Cary Xu 已提交
1535 1536
#if 0
  int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
C
Cary Xu 已提交
1537
  for (int32_t n = 0; n < nQueryWin; ++n) {
C
Cary Xu 已提交
1538
    TSKEY skey = taosArrayGet(pQuerySKey, n);
C
Cary Xu 已提交
1539
    if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) {
C
Cary Xu 已提交
1540 1541 1542
      // TODO: mark this window as expired.
    }
  }
C
Cary Xu 已提交
1543
#endif
C
Cary Xu 已提交
1544

1545
#if 1
C
Cary Xu 已提交
1546 1547
  int8_t smaStat = 0;
  if (!tsdbSmaStatIsOK(pItem, &smaStat)) {  // TODO: multiple check for large scale sma query
C
Cary Xu 已提交
1548
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1549
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
C
Cary Xu 已提交
1550 1551
    tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
             tstrerror(terrno), smaStat);
C
Cary Xu 已提交
1552 1553 1554
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1555
  if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) {
C
Cary Xu 已提交
1556
    // TODO: mark this window as expired.
1557 1558 1559 1560 1561
    tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
              querySKey, indexUid);
  } else {
    tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey,
              indexUid);
C
Cary Xu 已提交
1562
  }
C
Cary Xu 已提交
1563 1564

  STSma *pTSma = pItem->pSma;
C
Cary Xu 已提交
1565
#endif
1566

C
Cary Xu 已提交
1567
  STSmaReadH tReadH = {0};
C
Cary Xu 已提交
1568
  tsdbInitTSmaReadH(&tReadH, pTsdb, pTSma->interval, pTSma->intervalUnit);
C
Cary Xu 已提交
1569
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
1570 1571

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1572

1573
  tsdbInitTSmaFile(&tReadH, indexUid, querySKey);
C
Cary Xu 已提交
1574
  if (tsdbOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
C
Cary Xu 已提交
1575 1576 1577 1578
    tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1579 1580
  char    smaKey[SMA_KEY_LEN] = {0};
  void   *pSmaKey = &smaKey;
C
Cary Xu 已提交
1581 1582
  int64_t queryGroupId = 1;
  tsdbEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
C
Cary Xu 已提交
1583

C
Cary Xu 已提交
1584 1585
  tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
            tReadH.dFile.path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
C
Cary Xu 已提交
1586

C
Cary Xu 已提交
1587 1588
  void   *result = NULL;
  int32_t valueSize = 0;
C
Cary Xu 已提交
1589
  if (!(result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) {
C
Cary Xu 已提交
1590 1591
    tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
             REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
C
Cary Xu 已提交
1592 1593 1594
    tsdbCloseDBF(&tReadH.dFile);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1595 1596

#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
1597
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
1598
    tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
C
Cary Xu 已提交
1599 1600
  }
#endif
wafwerar's avatar
wafwerar 已提交
1601
  taosMemoryFreeClear(result);  // TODO: fill the result to output
C
Cary Xu 已提交
1602

C
Cary Xu 已提交
1603
#if 0
C
Cary Xu 已提交
1604 1605 1606 1607 1608 1609 1610 1611 1612
  int32_t nResult = 0;
  int64_t lastKey = 0;

  while (true) {
    if (nResult >= nMaxResult) {
      break;
    }

    // set and open the file according to the STSma param
C
Cary Xu 已提交
1613
    if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) {
C
Cary Xu 已提交
1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
      char bTree[100] = "\0";
      while (strncmp(bTree, "has more nodes", 100) == 0) {
        if (nResult >= nMaxResult) {
          break;
        }
        // tsdbGetDataFromBTree(bTree, queryWin, lastKey)
        // fill the pData
        ++nResult;
      }
    }
  }
C
Cary Xu 已提交
1625
#endif
C
Cary Xu 已提交
1626
  // read data from file and fill the result
C
Cary Xu 已提交
1627
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
1628 1629 1630
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1631 1632
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
  SSmaCfg vCreateSmaReq = {0};
C
Cary Xu 已提交
1633
  if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) {
C
Cary Xu 已提交
1634 1635 1636 1637 1638
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
    return -1;
  }
  tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName,
C
Cary Xu 已提交
1639
            vCreateSmaReq.tSma.indexUid);
C
Cary Xu 已提交
1640 1641 1642 1643

  // record current timezone of server side
  vCreateSmaReq.tSma.timezoneInt = tsTimezone;

H
Hongze Cheng 已提交
1644
  if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) {
C
Cary Xu 已提交
1645 1646 1647 1648 1649 1650 1651 1652 1653
    // TODO: handle error
    tdDestroyTSma(&vCreateSmaReq.tSma);
    return -1;
  }

  tsdbTSmaAdd(pTsdb, 1);

  tdDestroyTSma(&vCreateSmaReq.tSma);
  // TODO: return directly or go on follow steps?
1654
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1655 1656 1657 1658
}

int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
  SVDropTSmaReq vDropSmaReq = {0};
C
Cary Xu 已提交
1659
  if (!tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq)) {
C
Cary Xu 已提交
1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  // TODO: send msg to stream computing to drop tSma
  // if ((send msg to stream computing) < 0) {
  //   tdDestroyTSma(&vCreateSmaReq);
  //   return -1;
  // }
  //

H
Hongze Cheng 已提交
1671
  if (metaDropTSma(REPO_META(pTsdb), vDropSmaReq.indexUid) < 0) {
C
Cary Xu 已提交
1672 1673 1674 1675 1676 1677 1678 1679 1680
    // TODO: handle error
    return -1;
  }

  if (tsdbDropTSmaData(pTsdb, vDropSmaReq.indexUid) < 0) {
    // TODO: handle error
    return -1;
  }

C
Cary Xu 已提交
1681
  tsdbTSmaSub(pTsdb, 1);
C
Cary Xu 已提交
1682 1683

  // TODO: return directly or go on follow steps?
1684
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1685 1686
}

C
Cary Xu 已提交
1687
/**
C
Cary Xu 已提交
1688
 * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
C
Cary Xu 已提交
1689 1690 1691 1692 1693 1694
 *
 * @param pTsdb
 * @param pMeta
 * @param pReq
 * @return int32_t
 */
C
Cary Xu 已提交
1695 1696 1697
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateStbReq *pReq) {
  if (!pReq->rollup) {
    tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1698 1699 1700
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1701 1702
  SRSmaParam *param = &pReq->pRSmaParam;

C
Cary Xu 已提交
1703
  if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) {
C
Cary Xu 已提交
1704
    tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716
    return TSDB_CODE_SUCCESS;
  }

  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

  SSmaEnv   *pEnv = REPO_RSMA_ENV(pTsdb);
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

C
Cary Xu 已提交
1717
  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t));
C
Cary Xu 已提交
1718
  if (pRSmaInfo) {
C
Cary Xu 已提交
1719
    tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->suid);
C
Cary Xu 已提交
1720
    return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1721 1722
  }

C
Cary Xu 已提交
1723
  pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
C
Cary Xu 已提交
1724
  if (!pRSmaInfo) {
C
Cary Xu 已提交
1725 1726 1727 1728
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1729
  STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
C
Cary Xu 已提交
1730
  if (!pReadHandle) {
C
Cary Xu 已提交
1731
    taosMemoryFree(pRSmaInfo);
C
Cary Xu 已提交
1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }

  SReadHandle handle = {
      .reader = pReadHandle,
      .meta = pMeta,
  };

  if (param->qmsg1) {
    pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle);
C
Cary Xu 已提交
1743
    if (!pRSmaInfo->taskInfo[0]) {
C
Cary Xu 已提交
1744 1745 1746 1747 1748 1749 1750 1751
      taosMemoryFree(pRSmaInfo);
      taosMemoryFree(pReadHandle);
      return TSDB_CODE_FAILED;
    }
  }

  if (param->qmsg2) {
    pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle);
C
Cary Xu 已提交
1752
    if (!pRSmaInfo->taskInfo[1]) {
C
Cary Xu 已提交
1753 1754 1755 1756 1757 1758
      taosMemoryFree(pRSmaInfo);
      taosMemoryFree(pReadHandle);
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
1759
  if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
C
Cary Xu 已提交
1760
      TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1761
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1762
  } else {
C
Cary Xu 已提交
1763
    tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->suid);
C
Cary Xu 已提交
1764 1765 1766 1767 1768
  }

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1769 1770 1771 1772 1773 1774 1775 1776
/**
 * @brief store suid/[uids], prefer to use array and then hash
 *
 * @param pStore
 * @param suid
 * @param uid
 * @return int32_t
 */
C
Cary Xu 已提交
1777
static int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
C
Cary Xu 已提交
1778
  // prefer to store suid/uids in array
C
Cary Xu 已提交
1779
  if ((suid == pStore->suid) || (pStore->suid == 0)) {
C
Cary Xu 已提交
1780
    if (pStore->suid == 0) {
C
Cary Xu 已提交
1781 1782
      pStore->suid = suid;
    }
C
Cary Xu 已提交
1783 1784 1785 1786 1787 1788 1789
    if (uid) {
      if (!pStore->tbUids) {
        if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
      }
C
Cary Xu 已提交
1790
      if (!taosArrayPush(pStore->tbUids, uid)) {
C
Cary Xu 已提交
1791 1792
        return TSDB_CODE_FAILED;
      }
C
Cary Xu 已提交
1793 1794
    }
  } else {
C
Cary Xu 已提交
1795 1796
    // store other suid/uids in hash when multiple stable/table included in 1 batch of request
    if (!pStore->uidHash) {
C
Cary Xu 已提交
1797
      pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
C
Cary Xu 已提交
1798
      if (!pStore->uidHash) {
C
Cary Xu 已提交
1799 1800 1801
        return TSDB_CODE_FAILED;
      }
    }
C
Cary Xu 已提交
1802 1803 1804
    if (uid) {
      SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
      if (uidArray && ((uidArray = *(SArray **)uidArray))) {
C
Cary Xu 已提交
1805
        taosArrayPush(uidArray, uid);
C
Cary Xu 已提交
1806 1807 1808 1809 1810 1811
      } else {
        SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
        if (!pUidArray) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1812
        if (!taosArrayPush(pUidArray, uid)) {
C
Cary Xu 已提交
1813 1814 1815 1816 1817 1818
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          return TSDB_CODE_FAILED;
        }
        if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) {
          return TSDB_CODE_FAILED;
        }
C
Cary Xu 已提交
1819
      }
C
Cary Xu 已提交
1820 1821
    } else {
      if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) {
C
Cary Xu 已提交
1822 1823 1824 1825 1826 1827 1828
        return TSDB_CODE_FAILED;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1829 1830 1831 1832
void tsdbUidStoreDestory(STbUidStore *pStore) {
  if (pStore) {
    if (pStore->uidHash) {
      if (pStore->tbUids) {
C
Cary Xu 已提交
1833
        // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
C
Cary Xu 已提交
1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
        void *pIter = taosHashIterate(pStore->uidHash, NULL);
        while (pIter) {
          SArray *arr = *(SArray **)pIter;
          taosArrayDestroy(arr);
          pIter = taosHashIterate(pStore->uidHash, pIter);
        }
      }
      taosHashCleanup(pStore->uidHash);
    }
    taosArrayDestroy(pStore->tbUids);
  }
}

void *tsdbUidStoreFree(STbUidStore *pStore) {
C
Cary Xu 已提交
1848 1849 1850 1851
  if (pStore) {
    tsdbUidStoreDestory(pStore);
    taosMemoryFree(pStore);
  }
C
Cary Xu 已提交
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863
  return NULL;
}

/**
 * @brief fetch suid/uids when create child tables of rollup SMA
 *
 * @param pTsdb
 * @param ppStore
 * @param suid
 * @param uid
 * @return int32_t
 */
C
Cary Xu 已提交
1864
int32_t tsdbFetchTbUidList(STsdb *pTsdb, STbUidStore **ppStore, tb_uid_t suid, tb_uid_t uid) {
C
Cary Xu 已提交
1865
  SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb);
C
Cary Xu 已提交
1866 1867

  // only applicable to rollup SMA ctables
C
Cary Xu 已提交
1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878
  if (!pEnv) {
    return TSDB_CODE_SUCCESS;
  }

  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  SHashObj *infoHash = NULL;
  if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) {
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1879
  // info cached when create rsma stable and return directly for non-rsma ctables
C
Cary Xu 已提交
1880
  if (!taosHashGet(infoHash, &suid, sizeof(tb_uid_t))) {
C
Cary Xu 已提交
1881 1882 1883
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1884
  if (!(*ppStore)) {
C
Cary Xu 已提交
1885 1886 1887 1888
    if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) {
      return TSDB_CODE_FAILED;
    }
  }
C
Cary Xu 已提交
1889

C
Cary Xu 已提交
1890
  if (tsdbUidStorePut(*ppStore, suid, &uid) != 0) {
C
Cary Xu 已提交
1891 1892 1893
    *ppStore = tsdbUidStoreFree(*ppStore);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1894

C
Cary Xu 已提交
1895 1896 1897
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1898 1899 1900 1901 1902 1903 1904 1905 1906
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids) {
  SSmaEnv   *pEnv = REPO_RSMA_ENV(pTsdb);
  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

  if (!suid || !tbUids) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1907
  }
C
Cary Xu 已提交
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918

  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));
  if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
    tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64, REPO_ID(pTsdb), *suid);
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    return TSDB_CODE_FAILED;
  }

  if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) {
    tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1919 1920 1921
  } else {
    tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
              pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
C
Cary Xu 已提交
1922
  }
C
Cary Xu 已提交
1923

C
Cary Xu 已提交
1924 1925 1926
  if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) {
    tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1927 1928 1929
  } else {
    tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
              pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
C
Cary Xu 已提交
1930 1931 1932
  }

  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1933 1934
}

C
Cary Xu 已提交
1935 1936 1937
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
  if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
    tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb));
C
Cary Xu 已提交
1938 1939 1940
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1941
  if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
1942 1943 1944
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956
  void *pIter = taosHashIterate(pStore->uidHash, NULL);
  while (pIter) {
    tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
    SArray   *pTbUids = *(SArray **)pIter;

    if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) {
      taosHashCancelIterate(pStore->uidHash, pIter);
      return TSDB_CODE_FAILED;
    }

    pIter = taosHashIterate(pStore->uidHash, pIter);
  }
C
Cary Xu 已提交
1957 1958 1959
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1960
static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
C
Cary Xu 已提交
1961 1962 1963 1964 1965 1966 1967 1968
  ASSERT(pMsg != NULL);
  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SSubmitBlkIter blkIter = {0};
  STSRow        *row = NULL;

  terrno = TSDB_CODE_SUCCESS;

C
Cary Xu 已提交
1969
  if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
C
Cary Xu 已提交
1970
  while (true) {
C
Cary Xu 已提交
1971
    if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1;
C
Cary Xu 已提交
1972

C
Cary Xu 已提交
1973
    if (!pBlock) break;
C
Cary Xu 已提交
1974
    tsdbUidStorePut(pStore, msgIter.suid, NULL);
C
Cary Xu 已提交
1975 1976 1977 1978 1979 1980 1981
  }

  if (terrno != TSDB_CODE_SUCCESS) return -1;
  return 0;
}

int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType, tb_uid_t *suid) {
C
Cary Xu 已提交
1982 1983
  SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
  if (!pEnv) {
C
Cary Xu 已提交
1984
    // only applicable when rsma env exists
C
Cary Xu 已提交
1985 1986 1987 1988 1989 1990
    return TSDB_CODE_SUCCESS;
  }

  SSmaStat  *pStat = SMA_ENV_STAT(pEnv);
  SRSmaInfo *pRSmaInfo = NULL;

C
Cary Xu 已提交
1991 1992 1993 1994 1995 1996 1997
  pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t));

  if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
    tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), *suid);
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
1998 1999 2000
  SArray *pResult = NULL;

  pResult = taosArrayInit(0, sizeof(SSDataBlock));
C
Cary Xu 已提交
2001
  if (!pResult) {
C
Cary Xu 已提交
2002 2003 2004 2005 2006
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
C
Cary Xu 已提交
2007
    if (pRSmaInfo->taskInfo[0]) {
C
Cary Xu 已提交
2008 2009
      tsdbDebug("vgId:%d execute rsma task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), pRSmaInfo->taskInfo[0],
                *suid);
C
Cary Xu 已提交
2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021
      qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType);
      while (1) {
        SSDataBlock *output;
        uint64_t     ts;
        if (qExecTask(pRSmaInfo->taskInfo[0], &output, &ts) < 0) {
          ASSERT(false);
        }
        if (!output) {
          break;
        }
        taosArrayPush(pResult, output);
      }
C
Cary Xu 已提交
2022 2023 2024 2025 2026
      if (taosArrayGetSize(pResult) > 0) {
        blockDebugShowData(pResult);
      } else {
        tsdbWarn("vgId:%d no sma data generated since %s", REPO_ID(pTsdb), tstrerror(terrno));
      }
C
Cary Xu 已提交
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042
    }

    // if (pRSmaInfo->taskInfo[1]) {
    //   qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType);
    //   while (1) {
    //     SSDataBlock *output;
    //     uint64_t     ts;
    //     if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) {
    //       ASSERT(false);
    //     }
    //     if (!output) {
    //       break;
    //     }
    //     taosArrayPush(pResult, output);
    //   }
    //   blockDebugShowData(pResult);
C
Cary Xu 已提交
2043
    // }
C
Cary Xu 已提交
2044
  }
C
Cary Xu 已提交
2045

C
Cary Xu 已提交
2046 2047
  return TSDB_CODE_SUCCESS;
}
C
Cary Xu 已提交
2048

C
Cary Xu 已提交
2049
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType) {
C
Cary Xu 已提交
2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067
  SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
  if (!pEnv) {
    // only applicable when rsma env exists
    return TSDB_CODE_SUCCESS;
  }

  if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
    STbUidStore uidStore = {0};
    tsdbFetchSubmitReqSuids(pMsg, &uidStore);

    if (uidStore.suid != 0) {
      tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, &uidStore.suid);

      void *pIter = taosHashIterate(uidStore.uidHash, NULL);
      while (pIter) {
        tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
        tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, pTbSuid);
        pIter = taosHashIterate(uidStore.uidHash, pIter);
C
Cary Xu 已提交
2068
      }
C
Cary Xu 已提交
2069 2070

      tsdbUidStoreDestory(&uidStore);
C
Cary Xu 已提交
2071 2072 2073 2074 2075
    }
  }
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
2076
#if 0
C
Cary Xu 已提交
2077 2078 2079 2080 2081 2082 2083 2084 2085 2086
/**
 * @brief Get the start TS key of the last data block of one interval/sliding.
 *
 * @param pTsdb
 * @param param
 * @param result
 * @return int32_t
 *         1) Return 0 and fill the result if the check procedure is normal;
 *         2) Return -1 if error occurs during the check procedure.
 */
C
Cary Xu 已提交
2087
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) {
C
Cary Xu 已提交
2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103
  const char *procedure = "";
  if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
    return -1;
  }
  // fill the result
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Remove the tSma data files related to param between pWin.
 *
 * @param pTsdb
 * @param param
 * @param pWin
 * @return int32_t
 */
C
Cary Xu 已提交
2104
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
C
Cary Xu 已提交
2105 2106 2107 2108
  // for ("tSmaFiles of param-interval-sliding between pWin") {
  //   // remove the tSmaFile
  // }
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2109
}
C
Cary Xu 已提交
2110 2111
#endif

C
Cary Xu 已提交
2112
// TODO: Who is responsible for resource allocate and release?
C
Cary Xu 已提交
2113
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
C
Cary Xu 已提交
2114
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2115
  if ((code = tsdbInsertTSmaDataImpl(pTsdb, indexUid, msg)) < 0) {
C
Cary Xu 已提交
2116 2117
    tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
C
Cary Xu 已提交
2118
  // TODO: destroy SSDataBlocks(msg)
C
Cary Xu 已提交
2119 2120 2121
  return code;
}

2122
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) {
C
Cary Xu 已提交
2123
  int32_t code = TSDB_CODE_SUCCESS;
2124
  if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg, version)) < 0) {
C
Cary Xu 已提交
2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
    tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

C
Cary Xu 已提交
2138
int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
2139
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2140
  if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, querySKey, nMaxResult)) < 0) {
C
Cary Xu 已提交
2141 2142 2143
    tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
2144 2145 2146 2147 2148 2149 2150 2151
}

int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
    tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
2152
}