tsdbSma.c 45.2 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdbDef.h"

C
Cary Xu 已提交
18 19 20 21 22
static const char *TSDB_SMA_DNAME[] = {
    "",      // TSDB_SMA_TYPE_BLOCK
    "tsma",  // TSDB_SMA_TYPE_TIME_RANGE
    "rsma",  // TSDB_SMA_TYPE_ROLLUP
};
C
Cary Xu 已提交
23
#undef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
24
#define SMA_STORAGE_TSDB_DAYS   30
C
Cary Xu 已提交
25
#define SMA_STORAGE_TSDB_TIMES  10
C
Cary Xu 已提交
26 27
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN             18  // tableUid_colId_TSKEY 8+2+8
C
Cary Xu 已提交
28
#define SMA_DROP_EXPIRED_TIME   10  // default is 10 seconds
C
Cary Xu 已提交
29

30 31 32 33
#define SMA_STATE_HASH_SLOT      4
#define SMA_STATE_ITEM_HASH_SLOT 32

#define SMA_TEST_INDEX_NAME "smaTestIndexName"  // TODO: just for test
34
#define SMA_TEST_INDEX_UID  2000000001          // TODO: just for test
C
Cary Xu 已提交
35
typedef enum {
C
Cary Xu 已提交
36 37
  SMA_STORAGE_LEVEL_TSDB = 0,     // use days of self-defined  e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
  SMA_STORAGE_LEVEL_DFILESET = 1  // use days of TS data       e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
C
Cary Xu 已提交
38 39 40
} ESmaStorageLevel;

typedef struct {
C
Cary Xu 已提交
41 42 43 44
  STsdb       *pTsdb;
  SDBFile      dFile;
  SSDataBlock *pData;     // sma data
  int32_t      interval;  // interval with the precision of DB
C
Cary Xu 已提交
45 46 47 48
} STSmaWriteH;

typedef struct {
  int32_t iter;
C
Cary Xu 已提交
49
  int32_t fid;
C
Cary Xu 已提交
50
} SmaFsIter;
C
Cary Xu 已提交
51

C
Cary Xu 已提交
52
typedef struct {
C
Cary Xu 已提交
53
  STsdb    *pTsdb;
C
Cary Xu 已提交
54
  SDBFile   dFile;
C
Cary Xu 已提交
55 56 57 58 59 60 61
  int32_t   interval;   // interval with the precision of DB
  int32_t   blockSize;  // size of SMA block item
  int8_t    storageLevel;
  int8_t    days;
  SmaFsIter smaFsIter;
} STSmaReadH;

62 63 64 65 66
typedef struct {
  /**
   * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
   *    - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
   * Streaming Module or TSDB local persistence.
C
Cary Xu 已提交
67 68 69
   *    - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
   * without information about its previous state.
   *    - TSDB_SMA_STAT_DROPPED: 1)sma dropped
70 71 72
   */
  int8_t    state;           // ETsdbSmaStat
  SHashObj *expiredWindows;  // key: skey of time window, value: N/A
C
Cary Xu 已提交
73
  STSma    *pSma;            // cache schema
74 75 76
} SSmaStatItem;

struct SSmaStat {
C
Cary Xu 已提交
77
  SHashObj *smaStatItems;  // key: indexUid, value: SSmaStatItem
78
  T_REF_DECLARE()
79 80
};

C
Cary Xu 已提交
81
// declaration of static functions
C
Cary Xu 已提交
82

83
// expired window
C
Cary Xu 已提交
84
static int32_t  tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg);
C
update  
Cary Xu 已提交
85
static int32_t  tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey);
C
Cary Xu 已提交
86
static int32_t  tsdbInitSmaStat(SSmaStat **pSmaStat);
C
Cary Xu 已提交
87
static void    *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
C
Cary Xu 已提交
88
static int32_t  tsdbDestroySmaState(SSmaStat *pSmaStat);
89 90
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
static int32_t  tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
91 92 93 94 95 96 97 98 99
static int32_t  tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t  tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
static int32_t  tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);

// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
                                   int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
                                   int32_t nMaxResult);
C
Cary Xu 已提交
100

101
// insert data
C
Cary Xu 已提交
102 103
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval,
                                  int8_t intervalUnit);
104 105 106 107 108
static void    tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
C
Cary Xu 已提交
109
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
C
Cary Xu 已提交
110
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
111 112
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid);
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
C
Cary Xu 已提交
113
static bool    tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
C
Cary Xu 已提交
114
static void    tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
115 116
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
C
Cary Xu 已提交
117

C
Cary Xu 已提交
118 119 120
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);

121
// implementation
C
Cary Xu 已提交
122 123 124 125 126 127 128 129
static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    return atomic_load_8(&pStatItem->state);
  }
  return TSDB_SMA_STAT_UNKNOWN;
}

static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) {
C
Cary Xu 已提交
130
  if (!pStatItem) {
C
Cary Xu 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
    return false;
  }

  if (state) {
    *state = atomic_load_8(&pStatItem->state);
    return *state == TSDB_SMA_STAT_OK;
  }
  return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK;
}

static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true;
}

static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) {
  return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true;
}

static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK);
  }
}

static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED);
  }
}

static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) {
  if (pStatItem) {
    atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED);
  }
}

C
Cary Xu 已提交
167
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
168 169
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
           TSDB_SMA_DNAME[smaType]);
C
Cary Xu 已提交
170
}
C
Cary Xu 已提交
171

172
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) {
C
Cary Xu 已提交
173 174
  SSmaEnv *pEnv = NULL;

wafwerar's avatar
wafwerar 已提交
175
  pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv));
C
Cary Xu 已提交
176 177 178 179 180
  if (pEnv == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

wafwerar's avatar
wafwerar 已提交
181
  int code = taosThreadRwlockInit(&(pEnv->lock), NULL);
C
Cary Xu 已提交
182 183
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
wafwerar's avatar
wafwerar 已提交
184
    taosMemoryFree(pEnv);
C
Cary Xu 已提交
185 186 187 188 189 190 191 192 193 194
    return NULL;
  }

  ASSERT(path && (strlen(path) > 0));
  pEnv->path = strdup(path);
  if (pEnv->path == NULL) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

195 196
  pEnv->did = did;

C
Cary Xu 已提交
197 198 199 200 201
  if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

202 203 204
  char aname[TSDB_FILENAME_LEN] = {0};
  tfsAbsoluteName(pTsdb->pTfs, did, path, aname);
  if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
205 206 207 208
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
209 210 211
  return pEnv;
}

212
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) {
C
Cary Xu 已提交
213 214 215 216 217
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
218
  if (*pEnv == NULL) {
219
    if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) {
C
Cary Xu 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
    tsdbDestroySmaState(pSmaEnv->pStat);
wafwerar's avatar
wafwerar 已提交
236 237
    taosMemoryFreeClear(pSmaEnv->pStat);
    taosMemoryFreeClear(pSmaEnv->path);
wafwerar's avatar
wafwerar 已提交
238
    taosThreadRwlockDestroy(&(pSmaEnv->lock));
239
    tsdbCloseBDBEnv(pSmaEnv->dbEnv);
C
Cary Xu 已提交
240 241 242 243 244
  }
}

void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
  tsdbDestroySmaEnv(pSmaEnv);
wafwerar's avatar
wafwerar 已提交
245
  taosMemoryFreeClear(pSmaEnv);
C
Cary Xu 已提交
246 247 248
  return NULL;
}

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
  if (pStat == NULL) return 0;
  int ref = T_REF_INC(pStat);
  tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
  return 0;
}

static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
  if (pStat == NULL) return 0;

  int ref = T_REF_DEC(pStat);
  tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
  return 0;
}

264 265
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
  ASSERT(pSmaStat != NULL);
C
Cary Xu 已提交
266 267 268 269 270

  if (*pSmaStat != NULL) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

271 272 273 274 275
  /**
   *  1. Lazy mode utilized when init SSmaStat to update expired window(or hungry mode when tsdbNew).
   *  2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
   * tsdbInitSmaStat invoked in other multithread environment later.
   */
276
  if (*pSmaStat == NULL) {
wafwerar's avatar
wafwerar 已提交
277
    *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat));
278 279 280 281 282 283 284 285 286
    if (*pSmaStat == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    (*pSmaStat)->smaStatItems =
        taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

    if ((*pSmaStat)->smaStatItems == NULL) {
wafwerar's avatar
wafwerar 已提交
287
      taosMemoryFreeClear(*pSmaStat);
288 289 290 291 292 293 294 295 296
      return TSDB_CODE_FAILED;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
  SSmaStatItem *pItem = NULL;

wafwerar's avatar
wafwerar 已提交
297
  pItem = (SSmaStatItem *)taosMemoryCalloc(1, sizeof(SSmaStatItem));
298 299 300 301 302
  if (pItem) {
    pItem->state = state;
    pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
                                         true, HASH_ENTRY_LOCK);
    if (!pItem->expiredWindows) {
wafwerar's avatar
wafwerar 已提交
303
      taosMemoryFreeClear(pItem);
304 305 306 307 308
    }
  }
  return pItem;
}

C
Cary Xu 已提交
309 310 311
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
  if (pSmaStatItem != NULL) {
    tdDestroyTSma(pSmaStatItem->pSma);
wafwerar's avatar
wafwerar 已提交
312
    taosMemoryFreeClear(pSmaStatItem->pSma);
C
Cary Xu 已提交
313
    taosHashCleanup(pSmaStatItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
314
    taosMemoryFreeClear(pSmaStatItem);
C
Cary Xu 已提交
315 316 317 318
  }
  return NULL;
}

C
Cary Xu 已提交
319 320
/**
 * @brief Release resources allocated for its member fields, not including itself.
321 322 323
 *
 * @param pSmaStat
 * @return int32_t
C
Cary Xu 已提交
324
 */
C
Cary Xu 已提交
325
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
326 327
  if (pSmaStat) {
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
328
    void *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
329
    while (item != NULL) {
330
      SSmaStatItem *pItem = *(SSmaStatItem **)item;
C
Cary Xu 已提交
331
      tsdbFreeSmaStatItem(pItem);
332 333 334 335 336 337
      item = taosHashIterate(pSmaStat->smaStatItems, item);
    }
    taosHashCleanup(pSmaStat->smaStatItems);
  }
}

C
Cary Xu 已提交
338
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
339 340
  SSmaEnv *pEnv = NULL;

C
Cary Xu 已提交
341
  // return if already init
C
Cary Xu 已提交
342 343
  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
344
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pTSmaEnv)) != NULL) {
C
Cary Xu 已提交
345 346 347 348
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
349
      if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pRSmaEnv)) != NULL) {
C
Cary Xu 已提交
350 351 352 353 354 355 356 357
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
358 359
  // init sma env
  tsdbLockRepo(pTsdb);
360 361
  pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv);
  if (pEnv == NULL) {
C
Cary Xu 已提交
362
    char rname[TSDB_FILENAME_LEN] = {0};
C
Cary Xu 已提交
363

C
Cary Xu 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376
    SDiskID did = {0};
    tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
    if (did.level < 0 || did.id < 0) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }
    tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);

    if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

377
    if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
378 379 380 381
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

382 383
    (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv)
                                          : atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv);
C
Cary Xu 已提交
384
  }
C
Cary Xu 已提交
385
  tsdbUnlockRepo(pTsdb);
C
Cary Xu 已提交
386 387 388 389

  return TSDB_CODE_SUCCESS;
};

390

C
Cary Xu 已提交
391
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) {
392
  SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
393
  if (pItem == NULL) {
C
Cary Xu 已提交
394 395
    // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
    pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK);  // TODO use the real state
C
Cary Xu 已提交
396
    if (pItem == NULL) {
397
      // Response to stream computing: OOM
398
      // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
399 400 401
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
402 403 404
    // cache smaMeta
    STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
    if (pSma == NULL) {
405
      terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
C
Cary Xu 已提交
406
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
407
      taosMemoryFree(pItem);
408 409
      tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
               tstrerror(terrno));
C
Cary Xu 已提交
410 411 412 413
      return TSDB_CODE_FAILED;
    }
    pItem->pSma = pSma;

C
Cary Xu 已提交
414
    if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
415 416
      // If error occurs during put smaStatItem, free the resources of pItem
      taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
417
      taosMemoryFree(pItem);
418 419
      return TSDB_CODE_FAILED;
    }
C
Cary Xu 已提交
420 421 422
  } else if ((pItem = *(SSmaStatItem **)pItem) == NULL) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
423 424 425
  }

  int8_t state = TSDB_SMA_STAT_EXPIRED;
C
Cary Xu 已提交
426 427 428 429 430 431 432 433
  if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &state, sizeof(state)) != 0) {
    // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
    // tell query module to query raw TS data.
    // N.B.
    //  1) It is assumed to be extemely little probability event of fail to taosHashPut.
    //  2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
    // windows failed to put into hash table.
    taosHashCleanup(pItem->expiredWindows);
wafwerar's avatar
wafwerar 已提交
434
    taosMemoryFreeClear(pItem->pSma);
C
Cary Xu 已提交
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
    return TSDB_CODE_FAILED;
  }
  tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, winSKey);
}

/**
 * @brief Update expired window according to msg from stream computing module.
 *
 * @param pTsdb
 * @param msg SSubmitReq
 * @return int32_t
 */
int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
  const SSubmitReq *pMsg = (const SSubmitReq *)msg;

  if (pMsg->length <= sizeof(SSubmitReq)) {
    terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
    return TSDB_CODE_FAILED;
  }
  if (!pTsdb->pMeta) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
  int64_t       indexUid = SMA_TEST_INDEX_UID;
  const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
  TSKEY         expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
  TSKEY         skey1 = 1646987196 * 1e3;
466
  for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
C
Cary Xu 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
    expiredWindows[i] = skey1 + i;
  }
#else

#endif
  // TODO: decode the msg <= end

  if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
    return TSDB_CODE_FAILED;
  }

#ifndef TSDB_SMA_TEST
  TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
#endif


  // Firstly, assume that tSma can only be created on super table/normal table.
  // getActiveTimeWindow


  SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, TSDB_SMA_TYPE_TIME_RANGE);
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
  SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);

  TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL);


C
Cary Xu 已提交
495 496 497
  // basic procedure
  // TODO: optimization
  tsdbRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513

  SSubmitMsgIter msgIter = {0};
  SSubmitBlk    *pBlock = NULL;
  SInterval      interval = {0};

  if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
    return TSDB_CODE_FAILED;
  }

  while (true) {
    tGetSubmitMsgNext(&msgIter, &pBlock);
    if (pBlock == NULL) break;

    STSmaWrapper *pSW = NULL;
    STSma        *pTSma = NULL;

C
Cary Xu 已提交
514 515 516 517 518 519
    SSubmitBlkIter blkIter = {0};
    if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
      tdFreeTSmaWrapper(pSW);
      break;
    }

C
Cary Xu 已提交
520 521 522 523 524 525 526
    while (true) {
      STSRow *row = tGetSubmitBlkNext(&blkIter);
      if (row == NULL) {
        tdFreeTSmaWrapper(pSW);
        break;
      }
      if(pSW == NULL) {
C
Cary Xu 已提交
527
        if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) {
C
Cary Xu 已提交
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
          break;
        }
        if((pSW->number) <= 0 || (pSW->tSma == NULL)) {
          tdFreeTSmaWrapper(pSW);
          break;
        }
        pTSma = pSW->tSma;
      }

      interval.interval = pTSma->interval;
      interval.intervalUnit = pTSma->intervalUnit;
      interval.offset = pTSma->offset;
      interval.precision = REPO_CFG(pTsdb)->precision;
      interval.sliding = pTSma->sliding;
      interval.slidingUnit = pTSma->slidingUnit;

C
update  
Cary Xu 已提交
544 545 546
      TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);

      tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey);
547 548 549
    }
  }

550
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
551

552 553 554
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
555 556 557 558 559 560 561 562 563
/**
 * @brief When sma data received from stream computing, make the relative expired window valid.
 *
 * @param pTsdb
 * @param pStat
 * @param indexUid
 * @param skey
 * @return int32_t
 */
564
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
C
Cary Xu 已提交
565 566
  SSmaStatItem *pItem = NULL;

567 568
  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
569
  if (pStat && pStat->smaStatItems) {
570
    pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
571
  }
572
  if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
573 574
    // pItem resides in hash buffer all the time unless drop sma index
    // TODO: multithread protect
C
Cary Xu 已提交
575 576
    if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
      // error handling
577 578 579 580
      tsdbUnRefSmaStat(pTsdb, pStat);
      tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb),
               skey, indexUid);
      return TSDB_CODE_FAILED;
C
Cary Xu 已提交
581
    }
C
Cary Xu 已提交
582 583 584 585 586 587 588 589 590
    // TODO: use a standalone interface to received state upate notification from stream computing module.
    /**
     * @brief state
     *  - When SMA env init in TSDB, its status is TSDB_SMA_STAT_OK.
     *  - In startup phase of stream computing module, it should notify the SMA env in TSDB to expired if needed(e.g.
     * when batch data caculation not finised)
     *  - When TSDB_SMA_STAT_OK, the stream computing module should also notify that to the SMA env in TSDB.
     */
    pItem->state = TSDB_SMA_STAT_OK;
C
Cary Xu 已提交
591 592
  } else {
    // error handling
593 594 595
    tsdbUnRefSmaStat(pTsdb, pStat);
    tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid);
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
596
  }
597 598

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
599 600 601
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
602 603 604 605 606 607 608
/**
 * @brief Judge the tSma storage level
 *
 * @param interval
 * @param intervalUnit
 * @return int32_t
 */
C
Cary Xu 已提交
609
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
610 611
  // TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
  switch (intervalUnit) {
C
Cary Xu 已提交
612
    case TIME_UNIT_HOUR:
C
Cary Xu 已提交
613 614 615 616
      if (interval < SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
617
    case TIME_UNIT_MINUTE:
C
Cary Xu 已提交
618 619 620 621
      if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
622
    case TIME_UNIT_SECOND:
C
Cary Xu 已提交
623 624 625 626
      if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
627
    case TIME_UNIT_MILLISECOND:
C
Cary Xu 已提交
628 629 630 631
      if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
632
    case TIME_UNIT_MICROSECOND:
C
Cary Xu 已提交
633 634 635 636
      if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
637
    case TIME_UNIT_NANOSECOND:
C
Cary Xu 已提交
638 639 640 641 642 643 644 645 646 647 648
      if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
    default:
      break;
  }
  return SMA_STORAGE_LEVEL_TSDB;
}

/**
C
Cary Xu 已提交
649
 * @brief Insert TSma data blocks to DB File build by B+Tree
C
Cary Xu 已提交
650
 *
C
Cary Xu 已提交
651
 * @param pSmaH
652
 * @param smaKey  tableUid-colId-skeyOfWindow(8-2-8)
C
Cary Xu 已提交
653
 * @param keyLen
C
Cary Xu 已提交
654 655 656 657
 * @param pData
 * @param dataLen
 * @return int32_t
 */
658
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
C
Cary Xu 已提交
659 660
  SDBFile *pDBFile = &pSmaH->dFile;
  tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
C
Cary Xu 已提交
661 662
            REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
            *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
663

664
  // TODO: insert sma data blocks into B+Tree(TDB)
C
Cary Xu 已提交
665
  if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
666 667 668
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
669
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
670
  uint32_t valueSize = 0;
C
Cary Xu 已提交
671
  void    *data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
C
Cary Xu 已提交
672 673
  ASSERT(data != NULL);
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
674
    tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
C
Cary Xu 已提交
675 676
  }
#endif
C
Cary Xu 已提交
677 678 679
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
680 681 682 683 684 685 686 687
/**
 * @brief Approximate value for week/month/year.
 *
 * @param interval
 * @param intervalUnit
 * @param precision
 * @return int64_t
 */
C
Cary Xu 已提交
688
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) {
C
Cary Xu 已提交
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
  switch (intervalUnit) {
    case TIME_UNIT_YEAR:  // approximate value
      interval *= 365 * 86400 * 1e3;
      break;
    case TIME_UNIT_MONTH:  // approximate value
      interval *= 30 * 86400 * 1e3;
      break;
    case TIME_UNIT_WEEK:  // approximate value
      interval *= 7 * 86400 * 1e3;
      break;
    case TIME_UNIT_DAY:  // the interval for tSma calculation must <= day
      interval *= 86400 * 1e3;
      break;
    case TIME_UNIT_HOUR:
      interval *= 3600 * 1e3;
      break;
    case TIME_UNIT_MINUTE:
      interval *= 60 * 1e3;
      break;
    case TIME_UNIT_SECOND:
      interval *= 1e3;
      break;
    default:
      break;
C
Cary Xu 已提交
713 714
  }

C
Cary Xu 已提交
715 716
  switch (precision) {
    case TSDB_TIME_PRECISION_MILLI:
C
Cary Xu 已提交
717
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
718
        return interval / 1e3;
C
Cary Xu 已提交
719
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
720
        return interval / 1e6;
721
      } else {  // ms
C
Cary Xu 已提交
722 723 724
        return interval;
      }
      break;
C
Cary Xu 已提交
725
    case TSDB_TIME_PRECISION_MICRO:
C
Cary Xu 已提交
726
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
727
        return interval;
728
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
729
        return interval / 1e3;
730
      } else {  // ms
C
Cary Xu 已提交
731 732 733
        return interval * 1e3;
      }
      break;
C
Cary Xu 已提交
734
    case TSDB_TIME_PRECISION_NANO:
735
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
736
        return interval * 1e3;
737
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  // ns
C
Cary Xu 已提交
738
        return interval;
739
      } else {  // ms
C
Cary Xu 已提交
740
        return interval * 1e6;
C
Cary Xu 已提交
741 742
      }
      break;
C
Cary Xu 已提交
743
    default:                                        // ms
C
Cary Xu 已提交
744
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
745
        return interval / 1e3;
746
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  ns
C
Cary Xu 已提交
747
        return interval / 1e6;
748
      } else {  // ms
C
Cary Xu 已提交
749
        return interval;
C
Cary Xu 已提交
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
      }
      break;
  }
  return interval;
}

/**
 * @brief Split the TSma data blocks into expected size and insert into B+Tree.
 *
 * @param pSmaH
 * @param pData
 * @param nOffset The nOffset of blocks since fid changes.
 * @param nBlocks The nBlocks with the same fid since nOffset.
 * @return int32_t
 */
C
Cary Xu 已提交
765
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData) {
C
Cary Xu 已提交
766 767
  STsdb *pTsdb = pSmaH->pTsdb;

C
Cary Xu 已提交
768 769 770 771 772 773 774 775
  tsdbDebug("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64, pData->indexUid, pData->skey);

  // TODO: check the data integrity

  int32_t len = 0;
  while (true) {
    if (len >= pData->dataLen) {
      break;
C
Cary Xu 已提交
776
    }
C
Cary Xu 已提交
777 778 779 780 781 782 783 784 785 786 787
    assert(pData->dataLen > 0);
    STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pData->data, len);

    int32_t tbLen = 0;
    while (true) {
      if (tbLen >= pTbData->dataLen) {
        break;
      }
      assert(pTbData->dataLen > 0);
      STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen);
      char          smaKey[SMA_KEY_LEN] = {0};
C
Cary Xu 已提交
788
      void         *pSmaKey = &smaKey;
C
Cary Xu 已提交
789 790 791 792 793
#if 0
      printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
             pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif
      tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey);
794
      if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) {
C
Cary Xu 已提交
795 796
        tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
      }
C
Cary Xu 已提交
797
      tbLen += (sizeof(STSmaColData) + pColData->blockSize);
C
Cary Xu 已提交
798
    }
C
Cary Xu 已提交
799
    len += (sizeof(STSmaTbData) + pTbData->dataLen);
C
Cary Xu 已提交
800
  }
C
Cary Xu 已提交
801

C
Cary Xu 已提交
802 803 804
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
805
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
806
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
807 808
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
  pSmaH->pData = pData;
809 810 811 812 813
  return TSDB_CODE_SUCCESS;
}

static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
  if (pSmaH) {
C
Cary Xu 已提交
814
    tsdbCloseDBF(&pSmaH->dFile);
815
  }
C
Cary Xu 已提交
816 817
}

818
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid) {
C
Cary Xu 已提交
819
  STsdb *pTsdb = pSmaH->pTsdb;
C
Cary Xu 已提交
820
  ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
821 822

  pSmaH->dFile.fid = fid;
823
  char tSmaFile[TSDB_FILENAME_LEN] = {0};
824
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
825
  pSmaH->dFile.path = strdup(tSmaFile);
826

C
Cary Xu 已提交
827
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
828
}
C
Cary Xu 已提交
829

C
Cary Xu 已提交
830 831 832 833 834 835 836 837 838 839
/**
 * @brief
 *
 * @param pTsdb
 * @param interval Interval calculated by DB's precision
 * @param storageLevel
 * @return int32_t
 */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
  STsdbCfg *pCfg = REPO_CFG(pTsdb);
C
Cary Xu 已提交
840 841 842
  int32_t   daysPerFile = pCfg->daysPerFile;

  if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
C
Cary Xu 已提交
843
    int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
C
Cary Xu 已提交
844 845 846 847 848
    daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
  }

  return daysPerFile;
}
C
Cary Xu 已提交
849 850 851 852 853 854 855 856 857 858

/**
 * @brief Insert/Update Time-range-wise SMA data.
 *  - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
 * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
 *  - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
 * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
 *  - The destination file of one data block for some interval is determined by its start TS key.
 *
 * @param pTsdb
C
Cary Xu 已提交
859
 * @param msg
C
Cary Xu 已提交
860 861
 * @return int32_t
 */
C
Cary Xu 已提交
862
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
C
Cary Xu 已提交
863 864 865 866
  STsdbCfg    *pCfg = REPO_CFG(pTsdb);
  SSDataBlock *pData = (SSDataBlock *)msg;
  SSmaEnv     *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
  int64_t      indexUid = SMA_TEST_INDEX_UID;
C
Cary Xu 已提交
867

868
  if (pEnv == NULL) {
869 870 871 872
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return terrno;
  }
C
Cary Xu 已提交
873

C
Cary Xu 已提交
874 875 876 877
  if (pData == NULL) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d insert tSma data failed since pData is NULL", REPO_ID(pTsdb));
    return terrno;
C
Cary Xu 已提交
878 879
  }

C
Cary Xu 已提交
880 881 882
  if (taosArrayGetSize(pData->pDataBlock) <= 0) {
    terrno = TSDB_CODE_INVALID_PARA;
    tsdbWarn("vgId:%d insert tSma data failed since pDataBlock is empty", REPO_ID(pTsdb));
883
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
884
  }
C
Cary Xu 已提交
885

C
Cary Xu 已提交
886
  SSmaStat     *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
C
Cary Xu 已提交
887 888 889 890 891 892 893
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

  if (pStat && pStat->smaStatItems) {
    pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
  }
C
Cary Xu 已提交
894

C
Cary Xu 已提交
895 896 897 898 899 900
  if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
901 902 903 904 905 906 907 908
  STSma *pSma = pItem->pSma;

  STSmaWriteH tSmaH = {0};

  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) {
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
909 910
  char rPath[TSDB_FILENAME_LEN] = {0};
  char aPath[TSDB_FILENAME_LEN] = {0};
911 912 913 914
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
C
Cary Xu 已提交
915
      tsdbUnRefSmaStat(pTsdb, pStat);
916 917 918 919
      return TSDB_CODE_FAILED;
    }
  }

C
Cary Xu 已提交
920
  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
921
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
922
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
923 924 925


#if 0  
C
Cary Xu 已提交
926
  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
C
Cary Xu 已提交
927

C
Cary Xu 已提交
928 929
  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file
C
Cary Xu 已提交
930
  // TODO: tsdbStartTSmaCommit();
931
  tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
C
Cary Xu 已提交
932
  if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
933
    tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
C
Cary Xu 已提交
934
             tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
935
    tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
936
    tsdbUnRefSmaStat(pTsdb, pStat);
937 938
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
939

940 941 942
  if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
    tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
943
    tsdbUnRefSmaStat(pTsdb, pStat);
944 945
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
946
  // TODO:tsdbEndTSmaCommit();
C
Cary Xu 已提交
947

948
  // Step 3: reset the SSmaStat
949
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
950
#endif
951
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
952
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
953 954 955
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
956 957 958
/**
 * @brief Drop tSma data and local cache
 *        - insert/query reference
C
Cary Xu 已提交
959 960 961
 * @param pTsdb
 * @param msg
 * @return int32_t
C
Cary Xu 已提交
962 963 964 965 966 967 968 969 970 971 972 973
 */
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
  SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);

  // clear local cache
  if (pEnv) {
    tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid);

    SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
    if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) {
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
974
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
975 976 977 978 979 980
      }

      tsdbWLockSma(pEnv);
      if (tsdbSmaStatIsDropped(pItem)) {
        tsdbUnLockSma(pEnv);
        tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
981
        return TSDB_CODE_TDB_INVALID_ACTION;  // TODO: duplicate drop msg would be intercepted by mnode
C
Cary Xu 已提交
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
      }
      tsdbSmaStatSetDropped(pItem);
      tsdbUnLockSma(pEnv);

      int32_t nSleep = 0;
      while (true) {
        if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) {
          break;
        }
        taosSsleep(1);
        if (++nSleep > SMA_DROP_EXPIRED_TIME) {
          break;
        };
      }

      tsdbFreeSmaStatItem(pItem);
      tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
    }
  }
  // clear sma data files
C
Cary Xu 已提交
1002
  // TODO:
C
Cary Xu 已提交
1003 1004
}

C
Cary Xu 已提交
1005
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
1006 1007 1008 1009
  STsdb *pTsdb = pSmaH->pTsdb;

  char tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1010
  pSmaH->dFile.path = strdup(tSmaFile);
C
Cary Xu 已提交
1011 1012 1013 1014

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1015
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
C
Cary Xu 已提交
1016 1017 1018 1019
  STsdbCfg    *pCfg = REPO_CFG(pTsdb);
  SSDataBlock *pData = (SSDataBlock *)msg;
  SSmaEnv     *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
  int64_t      indexUid = SMA_TEST_INDEX_UID;
C
Cary Xu 已提交
1020

C
Cary Xu 已提交
1021 1022
  if (pEnv == NULL) {
    terrno = TSDB_CODE_INVALID_PTR;
C
Cary Xu 已提交
1023 1024 1025 1026 1027 1028 1029
    tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return terrno;
  }

  if (pEnv == NULL) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
C
Cary Xu 已提交
1030 1031
    return terrno;
  }
C
Cary Xu 已提交
1032

C
Cary Xu 已提交
1033 1034 1035 1036 1037 1038 1039
  if (pData == NULL) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d insert rSma data failed since pData is NULL", REPO_ID(pTsdb));
    return terrno;
  }

  if (taosArrayGetSize(pData->pDataBlock) <= 0) {
C
Cary Xu 已提交
1040
    terrno = TSDB_CODE_INVALID_PARA;
C
Cary Xu 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
    tsdbWarn("vgId:%d insert rSma data failed since pDataBlock is empty", REPO_ID(pTsdb));
    return TSDB_CODE_FAILED;
  }

  SSmaStat     *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
  SSmaStatItem *pItem = NULL;

  tsdbRefSmaStat(pTsdb, pStat);

  if (pStat && pStat->smaStatItems) {
    pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
  }

  if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
    tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1057
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1058
  }
C
Cary Xu 已提交
1059

C
Cary Xu 已提交
1060 1061
  STSma *pSma = pItem->pSma;

C
Cary Xu 已提交
1062
  STSmaWriteH tSmaH = {0};
C
Cary Xu 已提交
1063

C
Cary Xu 已提交
1064
  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) {
C
Cary Xu 已提交
1065 1066
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1067

C
Cary Xu 已提交
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
  char    rPath[TSDB_FILENAME_LEN] = {0};
  char    aPath[TSDB_FILENAME_LEN] = {0};
  snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
  tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
  if (!taosCheckExistFile(aPath)) {
    if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
      return TSDB_CODE_FAILED;
    }
  }

  // Step 1: Judge the storage level and days
C
Cary Xu 已提交
1079
  int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
C
Cary Xu 已提交
1080
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
1081
  #if 0
C
Cary Xu 已提交
1082 1083
  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));

C
Cary Xu 已提交
1084 1085
  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file
C
Cary Xu 已提交
1086
  // TODO: tsdbStartTSmaCommit();
C
Cary Xu 已提交
1087 1088 1089 1090 1091 1092 1093
  tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
  if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
    tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
             tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
1094

C
Cary Xu 已提交
1095 1096 1097 1098 1099
  if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
    tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1100 1101
  // TODO:tsdbEndTSmaCommit();

C
Cary Xu 已提交
1102 1103
  // Step 3: reset the SSmaStat
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
1104
#endif
C
Cary Xu 已提交
1105

C
Cary Xu 已提交
1106
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
1107
  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
1108 1109 1110 1111
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
1112
 * @brief
C
Cary Xu 已提交
1113 1114 1115
 *
 * @param pSmaH
 * @param pTsdb
C
Cary Xu 已提交
1116 1117
 * @param interval
 * @param intervalUnit
C
Cary Xu 已提交
1118 1119
 * @return int32_t
 */
C
Cary Xu 已提交
1120
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
1121
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
1122 1123 1124
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
  pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
  pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
C
Cary Xu 已提交
1125 1126 1127 1128 1129 1130
}

/**
 * @brief Init of tSma FS
 *
 * @param pReadH
1131
 * @param indexUid
C
Cary Xu 已提交
1132
 * @param skey
C
Cary Xu 已提交
1133 1134
 * @return int32_t
 */
1135 1136 1137 1138
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
  STsdb *pTsdb = pSmaH->pTsdb;

  int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision));
C
Cary Xu 已提交
1139
  char    tSmaFile[TSDB_FILENAME_LEN] = {0};
1140
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
1141 1142 1143
  pSmaH->dFile.path = strdup(tSmaFile);
  pSmaH->smaFsIter.iter = 0;
  pSmaH->smaFsIter.fid = fid;
C
Cary Xu 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
}

/**
 * @brief Set and open tSma file if it has key locates in queryWin.
 *
 * @param pReadH
 * @param param
 * @param queryWin
 * @return true
 * @return false
 */
C
Cary Xu 已提交
1155
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
C
Cary Xu 已提交
1156
  SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
C
Cary Xu 已提交
1157 1158
  int32_t nSmaFs = taosArrayGetSize(smaFs);

C
Cary Xu 已提交
1159
  tsdbCloseDBF(&pReadH->dFile);
C
Cary Xu 已提交
1160

C
Cary Xu 已提交
1161
#if 0
C
Cary Xu 已提交
1162 1163 1164 1165
  while (pReadH->smaFsIter.iter < nSmaFs) {
    void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
    if (pSmaFile) {  // match(indexName, queryWindow)
      // TODO: select the file by index_name ...
C
Cary Xu 已提交
1166
      pReadH->dFile = pSmaFile;
C
Cary Xu 已提交
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
      ++pReadH->smaFsIter.iter;
      break;
    }
    ++pReadH->smaFsIter.iter;
  }

  if (pReadH->pDFile != NULL) {
    tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
    return true;
  }
C
Cary Xu 已提交
1177
#endif
C
Cary Xu 已提交
1178 1179 1180 1181 1182

  return false;
}

/**
C
Cary Xu 已提交
1183
 * @brief
C
Cary Xu 已提交
1184
 *
C
Cary Xu 已提交
1185
 * @param pTsdb Return the data between queryWin and fill the pData.
C
Cary Xu 已提交
1186
 * @param pData
C
Cary Xu 已提交
1187 1188 1189 1190 1191 1192
 * @param indexUid
 * @param interval
 * @param intervalUnit
 * @param tableUid
 * @param colId
 * @param pQuerySKey
C
Cary Xu 已提交
1193 1194 1195
 * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
 * @return int32_t
 */
C
Cary Xu 已提交
1196
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
1197
                                   int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
C
Cary Xu 已提交
1198
                                   int32_t nMaxResult) {
1199 1200 1201
  SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);

  if (!pEnv) {
C
Cary Xu 已提交
1202 1203 1204 1205 1206
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return TSDB_CODE_FAILED;
  }

1207 1208
  tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pEnv));
  SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
1209
  if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) {
C
Cary Xu 已提交
1210 1211
    // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
    // it's NULL.
1212
    tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
C
Cary Xu 已提交
1213
    terrno = TSDB_CODE_TDB_INVALID_ACTION;
1214
    tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
1215
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
1216 1217
  }

C
Cary Xu 已提交
1218 1219
#if 0
  int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
C
Cary Xu 已提交
1220
  for (int32_t n = 0; n < nQueryWin; ++n) {
C
Cary Xu 已提交
1221 1222
    TSKEY skey = taosArrayGet(pQuerySKey, n);
    if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) {
C
Cary Xu 已提交
1223 1224 1225
      // TODO: mark this window as expired.
    }
  }
C
Cary Xu 已提交
1226
#endif
C
Cary Xu 已提交
1227

1228
#if 1
C
Cary Xu 已提交
1229 1230 1231 1232
  int8_t smaStat = 0;
  if (!tsdbSmaStatIsOK(pItem, &smaStat)) {  // TODO: multiple check for large scale sma query
    tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
    terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
C
Cary Xu 已提交
1233 1234
    tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid,
             tstrerror(terrno), smaStat);
C
Cary Xu 已提交
1235 1236 1237
    return TSDB_CODE_FAILED;
  }

1238
  if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
C
Cary Xu 已提交
1239
    // TODO: mark this window as expired.
1240 1241 1242 1243 1244
    tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
              querySKey, indexUid);
  } else {
    tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey,
              indexUid);
C
Cary Xu 已提交
1245
  }
1246 1247
  tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));

C
Cary Xu 已提交
1248
#endif
1249

C
Cary Xu 已提交
1250
  STSmaReadH tReadH = {0};
C
Cary Xu 已提交
1251 1252 1253
  tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
  tsdbCloseDBF(&tReadH.dFile);

1254
  tsdbInitTSmaFile(&tReadH, indexUid, querySKey);
C
Cary Xu 已提交
1255 1256 1257 1258 1259 1260 1261
  if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
    tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
    return TSDB_CODE_FAILED;
  }

  char  smaKey[SMA_KEY_LEN] = {0};
  void *pSmaKey = &smaKey;
1262
  tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey);
C
Cary Xu 已提交
1263

C
Cary Xu 已提交
1264
  tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
C
Cary Xu 已提交
1265 1266
            tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
            *(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
C
Cary Xu 已提交
1267

C
Cary Xu 已提交
1268
  void    *result = NULL;
C
Cary Xu 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277
  uint32_t valueSize = 0;
  if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
    tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64
             " since %s",
             REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
             *(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno));
    tsdbCloseDBF(&tReadH.dFile);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1278 1279

#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
1280
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
1281
    tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
C
Cary Xu 已提交
1282 1283
  }
#endif
wafwerar's avatar
wafwerar 已提交
1284
  taosMemoryFreeClear(result);  // TODO: fill the result to output
C
Cary Xu 已提交
1285

C
Cary Xu 已提交
1286
#if 0
C
Cary Xu 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295
  int32_t nResult = 0;
  int64_t lastKey = 0;

  while (true) {
    if (nResult >= nMaxResult) {
      break;
    }

    // set and open the file according to the STSma param
C
Cary Xu 已提交
1296
    if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) {
C
Cary Xu 已提交
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307
      char bTree[100] = "\0";
      while (strncmp(bTree, "has more nodes", 100) == 0) {
        if (nResult >= nMaxResult) {
          break;
        }
        // tsdbGetDataFromBTree(bTree, queryWin, lastKey)
        // fill the pData
        ++nResult;
      }
    }
  }
C
Cary Xu 已提交
1308
#endif
C
Cary Xu 已提交
1309
  // read data from file and fill the result
C
Cary Xu 已提交
1310
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
1311 1312 1313
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
1314
#if 0
C
Cary Xu 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324
/**
 * @brief Get the start TS key of the last data block of one interval/sliding.
 *
 * @param pTsdb
 * @param param
 * @param result
 * @return int32_t
 *         1) Return 0 and fill the result if the check procedure is normal;
 *         2) Return -1 if error occurs during the check procedure.
 */
C
Cary Xu 已提交
1325
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) {
C
Cary Xu 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
  const char *procedure = "";
  if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
    return -1;
  }
  // fill the result
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Remove the tSma data files related to param between pWin.
 *
 * @param pTsdb
 * @param param
 * @param pWin
 * @return int32_t
 */
C
Cary Xu 已提交
1342
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
C
Cary Xu 已提交
1343 1344 1345 1346
  // for ("tSmaFiles of param-interval-sliding between pWin") {
  //   // remove the tSmaFile
  // }
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1347
}
C
Cary Xu 已提交
1348 1349
#endif

C
Cary Xu 已提交
1350 1351

// TODO: Who is responsible for resource allocate and release?
C
Cary Xu 已提交
1352 1353 1354 1355 1356 1357 1358 1359
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

C
Cary Xu 已提交
1360
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg) {
C
Cary Xu 已提交
1361
  int32_t code = TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1362
  if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, msg)) < 0) {
C
Cary Xu 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
    tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

C
Cary Xu 已提交
1376

C
Cary Xu 已提交
1377
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
1378
                        tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
1379
  int32_t code = TSDB_CODE_SUCCESS;
1380
  if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey,
C
Cary Xu 已提交
1381 1382 1383 1384
                                  nMaxResult)) < 0) {
    tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
1385 1386
}

C
Cary Xu 已提交
1387

C
Cary Xu 已提交
1388 1389 1390 1391 1392 1393
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) {
    tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
C
Cary Xu 已提交
1394
}