tsdbSma.c 34.2 KB
Newer Older
C
Cary Xu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdbDef.h"

C
Cary Xu 已提交
18 19 20 21 22
static const char *TSDB_SMA_DNAME[] = {
    "",      // TSDB_SMA_TYPE_BLOCK
    "tsma",  // TSDB_SMA_TYPE_TIME_RANGE
    "rsma",  // TSDB_SMA_TYPE_ROLLUP
};
C
Cary Xu 已提交
23
#undef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
24
#define SMA_STORAGE_TSDB_DAYS   30
C
Cary Xu 已提交
25
#define SMA_STORAGE_TSDB_TIMES  10
C
Cary Xu 已提交
26 27 28
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN             18  // tableUid_colId_TSKEY 8+2+8

29 30 31 32
#define SMA_STATE_HASH_SLOT      4
#define SMA_STATE_ITEM_HASH_SLOT 32

#define SMA_TEST_INDEX_NAME "smaTestIndexName"  // TODO: just for test
33
#define SMA_TEST_INDEX_UID  2000000001          // TODO: just for test
C
Cary Xu 已提交
34
typedef enum {
C
Cary Xu 已提交
35 36
  SMA_STORAGE_LEVEL_TSDB = 0,     // use days of self-defined  e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma
  SMA_STORAGE_LEVEL_DFILESET = 1  // use days of TS data       e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
C
Cary Xu 已提交
37 38 39
} ESmaStorageLevel;

typedef struct {
C
Cary Xu 已提交
40 41 42
  STsdb * pTsdb;
  SDBFile dFile;
  int32_t interval;  // interval with the precision of DB
C
Cary Xu 已提交
43 44 45 46
} STSmaWriteH;

typedef struct {
  int32_t iter;
C
Cary Xu 已提交
47
  int32_t fid;
C
Cary Xu 已提交
48
} SmaFsIter;
C
Cary Xu 已提交
49

C
Cary Xu 已提交
50 51
typedef struct {
  STsdb *   pTsdb;
C
Cary Xu 已提交
52
  SDBFile   dFile;
C
Cary Xu 已提交
53 54 55 56 57 58 59
  int32_t   interval;   // interval with the precision of DB
  int32_t   blockSize;  // size of SMA block item
  int8_t    storageLevel;
  int8_t    days;
  SmaFsIter smaFsIter;
} STSmaReadH;

60 61 62 63 64 65 66 67 68 69
typedef struct {
  /**
   * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
   *    - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
   * without information about its previous state.
   *    - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
   * Streaming Module or TSDB local persistence.
   */
  int8_t    state;           // ETsdbSmaStat
  SHashObj *expiredWindows;  // key: skey of time window, value: N/A
C
Cary Xu 已提交
70
  STSma *   pSma;
71 72 73
} SSmaStatItem;

struct SSmaStat {
C
Cary Xu 已提交
74
  SHashObj *smaStatItems;  // key: indexUid, value: SSmaStatItem
75
  T_REF_DECLARE()
76 77
};

C
Cary Xu 已提交
78
// declaration of static functions
C
Cary Xu 已提交
79

80 81
// expired window
static int32_t  tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
C
Cary Xu 已提交
82 83 84 85
static int32_t  tsdbInitSmaStat(SSmaStat **pSmaStat);
static int32_t  tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
static int32_t  tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
86 87 88 89 90 91 92 93 94
static int32_t  tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t  tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
static int32_t  tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);

// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
                                   int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
                                   int32_t nMaxResult);
C
Cary Xu 已提交
95

96 97 98 99 100 101 102
// insert data
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static void    tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
C
Cary Xu 已提交
103
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
C
Cary Xu 已提交
104
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
C
Cary Xu 已提交
105
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
C
Cary Xu 已提交
106 107
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
static bool    tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
C
Cary Xu 已提交
108
static void    tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
109 110
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
C
Cary Xu 已提交
111

112
// implementation
C
Cary Xu 已提交
113 114
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
  snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
C
Cary Xu 已提交
115
}
C
Cary Xu 已提交
116

C
Cary Xu 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
  SSmaEnv *pEnv = NULL;

  pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv));
  if (pEnv == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int code = pthread_rwlock_init(&(pEnv->lock), NULL);
  if (code) {
    terrno = TAOS_SYSTEM_ERROR(code);
    free(pEnv);
    return NULL;
  }

  ASSERT(path && (strlen(path) > 0));
  pEnv->path = strdup(path);
  if (pEnv->path == NULL) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

  if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

145 146 147 148 149
  if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) {
    tsdbFreeSmaEnv(pEnv);
    return NULL;
  }

C
Cary Xu 已提交
150 151 152 153 154 155 156 157 158
  return pEnv;
}

static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
  if (!pEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
159
  if (*pEnv) {
C
Cary Xu 已提交
160 161 162
    return TSDB_CODE_SUCCESS;
  }

C
Cary Xu 已提交
163
  if (*pEnv == NULL) {
C
Cary Xu 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
    if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
      return TSDB_CODE_FAILED;
    }
  }

  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Release resources allocated for its member fields, not including itself.
 *
 * @param pSmaEnv
 * @return int32_t
 */
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
  if (pSmaEnv) {
    tsdbDestroySmaState(pSmaEnv->pStat);
    tfree(pSmaEnv->pStat);
    tfree(pSmaEnv->path);
    pthread_rwlock_destroy(&(pSmaEnv->lock));
184
    tsdbCloseBDBEnv(pSmaEnv->dbEnv);
C
Cary Xu 已提交
185 186 187 188 189 190 191 192 193
  }
}

void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
  tsdbDestroySmaEnv(pSmaEnv);
  tfree(pSmaEnv);
  return NULL;
}

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
  if (pStat == NULL) return 0;
  int ref = T_REF_INC(pStat);
  tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
  return 0;
}

static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
  if (pStat == NULL) return 0;

  int ref = T_REF_DEC(pStat);
  tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
  return 0;
}

209 210
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
  ASSERT(pSmaStat != NULL);
C
Cary Xu 已提交
211 212 213 214 215 216

  if (*pSmaStat != NULL) {  // no lock
    return TSDB_CODE_SUCCESS;
  }

  // TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
  if (*pSmaStat == NULL) {
    *pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat));
    if (*pSmaStat == NULL) {
      // TODO: unlock
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_FAILED;
    }

    (*pSmaStat)->smaStatItems =
        taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

    if ((*pSmaStat)->smaStatItems == NULL) {
      tfree(*pSmaStat);
      // TODO: unlock
      return TSDB_CODE_FAILED;
    }
  }
  // TODO: unlock
  return TSDB_CODE_SUCCESS;
}

static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
  SSmaStatItem *pItem = NULL;

  pItem = (SSmaStatItem *)calloc(1, sizeof(SSmaStatItem));
  if (pItem) {
    pItem->state = state;
    pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP),
                                         true, HASH_ENTRY_LOCK);
    if (!pItem->expiredWindows) {
      tfree(pItem);
    }
  }
  return pItem;
}

C
Cary Xu 已提交
253 254
/**
 * @brief Release resources allocated for its member fields, not including itself.
255 256 257
 *
 * @param pSmaStat
 * @return int32_t
C
Cary Xu 已提交
258
 */
C
Cary Xu 已提交
259
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
260 261 262 263
  if (pSmaStat) {
    // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
    SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
    while (item != NULL) {
C
Cary Xu 已提交
264
      tfree(item->pSma);
265 266 267 268 269 270 271
      taosHashCleanup(item->expiredWindows);
      item = taosHashIterate(pSmaStat->smaStatItems, item);
    }
    taosHashCleanup(pSmaStat->smaStatItems);
  }
}

C
Cary Xu 已提交
272
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
C
Cary Xu 已提交
273
  // return if already init
C
Cary Xu 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  switch (smaType) {
    case TSDB_SMA_TYPE_TIME_RANGE:
      if (pTsdb->pTSmaEnv) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    case TSDB_SMA_TYPE_ROLLUP:
      if (pTsdb->pRSmaEnv) {
        return TSDB_CODE_SUCCESS;
      }
      break;
    default:
      terrno = TSDB_CODE_INVALID_PARA;
      return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
290 291 292 293 294
  // init sma env
  tsdbLockRepo(pTsdb);
  if (pTsdb->pTSmaEnv == NULL) {
    char rname[TSDB_FILENAME_LEN] = {0};
    char aname[TSDB_FILENAME_LEN * 2 + 32] = {0};  // TODO: make TMPNAME_LEN public as TSDB_FILENAME_LEN?
C
Cary Xu 已提交
295

C
Cary Xu 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
    SDiskID did = {0};
    tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
    if (did.level < 0 || did.id < 0) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }
    tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
    tfsAbsoluteName(pTsdb->pTfs, did, rname, aname);

    if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

    SSmaEnv *pEnv = NULL;
    if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) {
      tsdbUnlockRepo(pTsdb);
      return TSDB_CODE_FAILED;
    }

    if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
      pTsdb->pTSmaEnv = pEnv;
    } else {
      pTsdb->pRSmaEnv = pEnv;
    }
C
Cary Xu 已提交
321
  }
C
Cary Xu 已提交
322
  tsdbUnlockRepo(pTsdb);
C
Cary Xu 已提交
323 324 325 326

  return TSDB_CODE_SUCCESS;
};

327 328
/**
 * @brief Update expired window according to msg from stream computing module.
329 330
 *
 * @param pTsdb
C
Cary Xu 已提交
331
 * @param smaType ETsdbSmaType
332 333
 * @param msg
 * @return int32_t
334
 */
335
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
C
Cary Xu 已提交
336 337
  if (!msg || !pTsdb->pMeta) {
    terrno = TSDB_CODE_INVALID_PTR;
338 339 340
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
341 342
  if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
    terrno = TSDB_CODE_TDB_INIT_FAILED;
343 344 345
    return TSDB_CODE_FAILED;
  }

346 347
  // TODO: decode the msg from Stream Computing module => start
  int64_t       indexUid = SMA_TEST_INDEX_UID;
348 349
  const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
  TSKEY         expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
350
  TSKEY         skey1 = 1646987196 * 1e3;
351
  for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
352
    expiredWindows[i] = skey1 + i;
353
  }
C
Cary Xu 已提交
354
  // TODO: decode the msg <= end
355 356 357

  SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType);
  SSmaStat *pStat = SMA_ENV_STAT(pEnv);
C
Cary Xu 已提交
358
  SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
359

360
  tsdbRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
361
  SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
362
  if (pItem == NULL) {
363
    pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED);  // TODO use the real state
C
Cary Xu 已提交
364
    if (pItem == NULL) {
365
      // Response to stream computing: OOM
366 367
      // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
      tsdbUnRefSmaStat(pTsdb, pStat);
368 369 370
      return TSDB_CODE_FAILED;
    }

C
Cary Xu 已提交
371 372 373
    // cache smaMeta
    STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
    if (pSma == NULL) {
374
      terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
C
Cary Xu 已提交
375 376
      taosHashCleanup(pItem->expiredWindows);
      free(pItem);
377
      tsdbUnRefSmaStat(pTsdb, pStat);
378 379
      tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
               tstrerror(terrno));
C
Cary Xu 已提交
380 381 382 383
      return TSDB_CODE_FAILED;
    }
    pItem->pSma = pSma;

C
Cary Xu 已提交
384
    if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
385 386 387
      // If error occurs during put smaStatItem, free the resources of pItem
      taosHashCleanup(pItem->expiredWindows);
      free(pItem);
388
      tsdbUnRefSmaStat(pTsdb, pStat);
389 390 391 392 393 394
      return TSDB_CODE_FAILED;
    }
  }

  int8_t state = TSDB_SMA_STAT_EXPIRED;
  for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
C
Cary Xu 已提交
395
    if (taosHashPut(pItem->expiredWindows, expiredWindows + i, sizeof(TSKEY), &state, sizeof(state)) != 0) {
C
Cary Xu 已提交
396 397 398 399 400 401
      // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
      // tell query module to query raw TS data.
      // N.B.
      //  1) It is assumed to be extemely little probability event of fail to taosHashPut.
      //  2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
      // windows failed to put into hash table.
402
      taosHashCleanup(pItem->expiredWindows);
C
Cary Xu 已提交
403
      tfree(pItem->pSma);
C
Cary Xu 已提交
404
      taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
405
      tsdbUnRefSmaStat(pTsdb, pStat);
406 407
      return TSDB_CODE_FAILED;
    }
408 409
    tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid,
              expiredWindows[i]);
410 411
  }

412
  tsdbUnRefSmaStat(pTsdb, pStat);
413 414 415
  return TSDB_CODE_SUCCESS;
}

416
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
C
Cary Xu 已提交
417 418
  SSmaStatItem *pItem = NULL;

419 420
  tsdbRefSmaStat(pTsdb, pStat);

C
Cary Xu 已提交
421
  if (pStat && pStat->smaStatItems) {
422
    pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
423 424
  }
  if (pItem != NULL) {
425 426
    // pItem resides in hash buffer all the time unless drop sma index
    // TODO: multithread protect
C
Cary Xu 已提交
427 428
    if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
      // error handling
429 430 431 432
      tsdbUnRefSmaStat(pTsdb, pStat);
      tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb),
               skey, indexUid);
      return TSDB_CODE_FAILED;
C
Cary Xu 已提交
433
    }
C
Cary Xu 已提交
434 435
  } else {
    // error handling
436 437 438
    tsdbUnRefSmaStat(pTsdb, pStat);
    tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid);
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
439
  }
440 441

  tsdbUnRefSmaStat(pTsdb, pStat);
C
Cary Xu 已提交
442 443 444
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
445 446 447 448 449 450 451
/**
 * @brief Judge the tSma storage level
 *
 * @param interval
 * @param intervalUnit
 * @return int32_t
 */
C
Cary Xu 已提交
452
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
453 454
  // TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
  switch (intervalUnit) {
C
Cary Xu 已提交
455
    case TIME_UNIT_HOUR:
C
Cary Xu 已提交
456 457 458 459
      if (interval < SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
460
    case TIME_UNIT_MINUTE:
C
Cary Xu 已提交
461 462 463 464
      if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
465
    case TIME_UNIT_SECOND:
C
Cary Xu 已提交
466 467 468 469
      if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
470
    case TIME_UNIT_MILLISECOND:
C
Cary Xu 已提交
471 472 473 474
      if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
475
    case TIME_UNIT_MICROSECOND:
C
Cary Xu 已提交
476 477 478 479
      if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
C
Cary Xu 已提交
480
    case TIME_UNIT_NANOSECOND:
C
Cary Xu 已提交
481 482 483 484 485 486 487 488 489 490 491
      if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
        return SMA_STORAGE_LEVEL_DFILESET;
      }
      break;
    default:
      break;
  }
  return SMA_STORAGE_LEVEL_TSDB;
}

/**
C
Cary Xu 已提交
492
 * @brief Insert TSma data blocks to DB File build by B+Tree
C
Cary Xu 已提交
493
 *
C
Cary Xu 已提交
494
 * @param pSmaH
C
Cary Xu 已提交
495
 * @param smaKey
C
Cary Xu 已提交
496
 * @param keyLen
C
Cary Xu 已提交
497 498 499 500
 * @param pData
 * @param dataLen
 * @return int32_t
 */
501
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
C
Cary Xu 已提交
502
  SDBFile *pDBFile = &pSmaH->dFile;
503

C
Cary Xu 已提交
504
  // TODO: insert sma data blocks into B+Tree
C
Cary Xu 已提交
505
  tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
C
Cary Xu 已提交
506 507
            REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
            *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
508

C
Cary Xu 已提交
509
  if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
510 511 512
    return TSDB_CODE_FAILED;
  }

C
Cary Xu 已提交
513
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
514 515 516 517
  uint32_t valueSize = 0;
  void *   data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
  ASSERT(data != NULL);
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
518
    tsdbWarn("vgId:%d insert sma data val[%d] %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
C
Cary Xu 已提交
519 520
  }
#endif
C
Cary Xu 已提交
521 522 523
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
524 525 526 527 528 529 530 531
/**
 * @brief Approximate value for week/month/year.
 *
 * @param interval
 * @param intervalUnit
 * @param precision
 * @return int64_t
 */
C
Cary Xu 已提交
532
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) {
C
Cary Xu 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556
  switch (intervalUnit) {
    case TIME_UNIT_YEAR:  // approximate value
      interval *= 365 * 86400 * 1e3;
      break;
    case TIME_UNIT_MONTH:  // approximate value
      interval *= 30 * 86400 * 1e3;
      break;
    case TIME_UNIT_WEEK:  // approximate value
      interval *= 7 * 86400 * 1e3;
      break;
    case TIME_UNIT_DAY:  // the interval for tSma calculation must <= day
      interval *= 86400 * 1e3;
      break;
    case TIME_UNIT_HOUR:
      interval *= 3600 * 1e3;
      break;
    case TIME_UNIT_MINUTE:
      interval *= 60 * 1e3;
      break;
    case TIME_UNIT_SECOND:
      interval *= 1e3;
      break;
    default:
      break;
C
Cary Xu 已提交
557 558
  }

C
Cary Xu 已提交
559 560
  switch (precision) {
    case TSDB_TIME_PRECISION_MILLI:
C
Cary Xu 已提交
561
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
562
        return interval / 1e3;
C
Cary Xu 已提交
563
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
564 565
        return interval / 1e6;
      } else {
C
Cary Xu 已提交
566 567 568
        return interval;
      }
      break;
C
Cary Xu 已提交
569
    case TSDB_TIME_PRECISION_MICRO:
C
Cary Xu 已提交
570
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
571
        return interval;
C
Cary Xu 已提交
572
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
573 574
        return interval / 1e3;
      } else {
C
Cary Xu 已提交
575 576 577
        return interval * 1e3;
      }
      break;
C
Cary Xu 已提交
578
    case TSDB_TIME_PRECISION_NANO:
C
Cary Xu 已提交
579
      if (TIME_UNIT_MICROSECOND == intervalUnit) {
C
Cary Xu 已提交
580
        return interval * 1e3;
C
Cary Xu 已提交
581
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  // nano second
C
Cary Xu 已提交
582
        return interval;
C
Cary Xu 已提交
583 584
      } else {
        return interval * 1e6;
C
Cary Xu 已提交
585 586
      }
      break;
C
Cary Xu 已提交
587
    default:                                        // ms
C
Cary Xu 已提交
588
      if (TIME_UNIT_MICROSECOND == intervalUnit) {  // us
C
Cary Xu 已提交
589
        return interval / 1e3;
C
Cary Xu 已提交
590
      } else if (TIME_UNIT_NANOSECOND == intervalUnit) {  //  nano second
C
Cary Xu 已提交
591 592 593
        return interval / 1e6;
      } else {
        return interval;
C
Cary Xu 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
      }
      break;
  }
  return interval;
}

/**
 * @brief Split the TSma data blocks into expected size and insert into B+Tree.
 *
 * @param pSmaH
 * @param pData
 * @param nOffset The nOffset of blocks since fid changes.
 * @param nBlocks The nBlocks with the same fid since nOffset.
 * @return int32_t
 */
C
Cary Xu 已提交
609
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData) {
C
Cary Xu 已提交
610 611
  STsdb *pTsdb = pSmaH->pTsdb;

C
Cary Xu 已提交
612 613 614 615 616 617 618 619
  tsdbDebug("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64, pData->indexUid, pData->skey);

  // TODO: check the data integrity

  int32_t len = 0;
  while (true) {
    if (len >= pData->dataLen) {
      break;
C
Cary Xu 已提交
620
    }
C
Cary Xu 已提交
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
    assert(pData->dataLen > 0);
    STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pData->data, len);

    int32_t tbLen = 0;
    while (true) {
      if (tbLen >= pTbData->dataLen) {
        break;
      }
      assert(pTbData->dataLen > 0);
      STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen);
      char          smaKey[SMA_KEY_LEN] = {0};
      void *        pSmaKey = &smaKey;
#if 0
      printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
             pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif
      tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey);
638
      if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) {
C
Cary Xu 已提交
639 640
        tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
      }
C
Cary Xu 已提交
641
      tbLen += (sizeof(STSmaColData) + pColData->blockSize);
C
Cary Xu 已提交
642
    }
C
Cary Xu 已提交
643
    len += (sizeof(STSmaTbData) + pTbData->dataLen);
C
Cary Xu 已提交
644
  }
C
Cary Xu 已提交
645

C
Cary Xu 已提交
646 647 648
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
649
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
C
Cary Xu 已提交
650
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
651
  pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
652 653 654 655 656
  return TSDB_CODE_SUCCESS;
}

static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
  if (pSmaH) {
C
Cary Xu 已提交
657
    tsdbCloseDBF(&pSmaH->dFile);
658
  }
C
Cary Xu 已提交
659 660
}

C
Cary Xu 已提交
661
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
C
Cary Xu 已提交
662
  STsdb *pTsdb = pSmaH->pTsdb;
C
Cary Xu 已提交
663
  ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
664 665
  char tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
666
  pSmaH->dFile.path = strdup(tSmaFile);
C
Cary Xu 已提交
667
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
668
}
C
Cary Xu 已提交
669

C
Cary Xu 已提交
670 671 672 673 674 675 676 677 678 679
/**
 * @brief
 *
 * @param pTsdb
 * @param interval Interval calculated by DB's precision
 * @param storageLevel
 * @return int32_t
 */
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
  STsdbCfg *pCfg = REPO_CFG(pTsdb);
C
Cary Xu 已提交
680 681 682
  int32_t   daysPerFile = pCfg->daysPerFile;

  if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
C
Cary Xu 已提交
683
    int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
C
Cary Xu 已提交
684 685 686 687 688
    daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
  }

  return daysPerFile;
}
C
Cary Xu 已提交
689 690 691 692 693 694 695 696 697 698

/**
 * @brief Insert/Update Time-range-wise SMA data.
 *  - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
 * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
 *  - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
 * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
 *  - The destination file of one data block for some interval is determined by its start TS key.
 *
 * @param pTsdb
C
Cary Xu 已提交
699
 * @param msg
C
Cary Xu 已提交
700 701
 * @return int32_t
 */
C
Cary Xu 已提交
702
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
703
  STsdbCfg *        pCfg = REPO_CFG(pTsdb);
C
Cary Xu 已提交
704
  STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
C
Cary Xu 已提交
705

706 707 708 709 710
  if (!pTsdb->pTSmaEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return terrno;
  }
C
Cary Xu 已提交
711

C
Cary Xu 已提交
712
  if (pData->dataLen <= 0) {
C
Cary Xu 已提交
713 714
    TASSERT(0);
    terrno = TSDB_CODE_INVALID_PARA;
715
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
716 717
  }

718 719 720 721
  STSmaWriteH tSmaH = {0};

  if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
722
  }
C
Cary Xu 已提交
723

C
Cary Xu 已提交
724 725
  // Step 1: Judge the storage level and days
  int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
C
Cary Xu 已提交
726
  int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
C
Cary Xu 已提交
727
  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
C
Cary Xu 已提交
728

C
Cary Xu 已提交
729 730
  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file
C
Cary Xu 已提交
731 732
  // TODO: tsdbStartTSmaCommit();
  tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
C
Cary Xu 已提交
733
  if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
734
    tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
C
Cary Xu 已提交
735
             tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
736 737 738
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
739

740 741 742 743 744
  if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
    tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
    tsdbDestroyTSmaWriteH(&tSmaH);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
745
  // TODO:tsdbEndTSmaCommit();
C
Cary Xu 已提交
746

747
  // Step 3: reset the SSmaStat
748
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
749

750
  tsdbDestroyTSmaWriteH(&tSmaH);
C
Cary Xu 已提交
751 752 753
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
754
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
755 756 757 758
  STsdb *pTsdb = pSmaH->pTsdb;

  char tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
C
Cary Xu 已提交
759
  pSmaH->dFile.path = strdup(tSmaFile);
C
Cary Xu 已提交
760 761 762 763

  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
764
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
765
  STsdbCfg *        pCfg = REPO_CFG(pTsdb);
C
Cary Xu 已提交
766
  STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
767
  STSmaWriteH       tSmaH = {0};
C
Cary Xu 已提交
768

C
Cary Xu 已提交
769 770 771
  tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);

  if (pData->dataLen <= 0) {
C
Cary Xu 已提交
772
    TASSERT(0);
C
Cary Xu 已提交
773 774
    terrno = TSDB_CODE_INVALID_PARA;
    return terrno;
C
Cary Xu 已提交
775
  }
C
Cary Xu 已提交
776

C
Cary Xu 已提交
777
  // Step 1: Judge the storage level
C
Cary Xu 已提交
778
  int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
C
Cary Xu 已提交
779 780 781 782 783 784 785 786 787 788
  int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;

  // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
  //         - Set and open the DFile or the B+Tree file

  int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));

  // Save all the TSma data to one file
  // TODO: tsdbStartTSmaCommit();
  tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
789

C
Cary Xu 已提交
790 791 792
  tsdbInsertTSmaDataSection(&tSmaH, pData);
  // TODO:tsdbEndTSmaCommit();

C
Cary Xu 已提交
793
  // reset the SSmaStat
794
  tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
C
Cary Xu 已提交
795

C
Cary Xu 已提交
796 797 798 799
  return TSDB_CODE_SUCCESS;
}

/**
C
Cary Xu 已提交
800
 * @brief
C
Cary Xu 已提交
801 802 803
 *
 * @param pSmaH
 * @param pTsdb
C
Cary Xu 已提交
804 805
 * @param interval
 * @param intervalUnit
C
Cary Xu 已提交
806 807
 * @return int32_t
 */
C
Cary Xu 已提交
808
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
C
Cary Xu 已提交
809
  pSmaH->pTsdb = pTsdb;
C
Cary Xu 已提交
810 811 812
  pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
  pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
  pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
C
Cary Xu 已提交
813 814 815 816 817 818
}

/**
 * @brief Init of tSma FS
 *
 * @param pReadH
C
Cary Xu 已提交
819
 * @param skey
C
Cary Xu 已提交
820 821
 * @return int32_t
 */
C
Cary Xu 已提交
822 823 824 825 826 827 828
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) {
  int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision));
  char    tSmaFile[TSDB_FILENAME_LEN] = {0};
  snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid);
  pSmaH->dFile.path = strdup(tSmaFile);
  pSmaH->smaFsIter.iter = 0;
  pSmaH->smaFsIter.fid = fid;
C
Cary Xu 已提交
829 830 831 832 833 834 835 836 837 838 839
}

/**
 * @brief Set and open tSma file if it has key locates in queryWin.
 *
 * @param pReadH
 * @param param
 * @param queryWin
 * @return true
 * @return false
 */
C
Cary Xu 已提交
840
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
C
Cary Xu 已提交
841
  SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
C
Cary Xu 已提交
842 843
  int32_t nSmaFs = taosArrayGetSize(smaFs);

C
Cary Xu 已提交
844
  tsdbCloseDBF(&pReadH->dFile);
C
Cary Xu 已提交
845

C
Cary Xu 已提交
846
#if 0
C
Cary Xu 已提交
847 848 849 850
  while (pReadH->smaFsIter.iter < nSmaFs) {
    void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
    if (pSmaFile) {  // match(indexName, queryWindow)
      // TODO: select the file by index_name ...
C
Cary Xu 已提交
851
      pReadH->dFile = pSmaFile;
C
Cary Xu 已提交
852 853 854 855 856 857 858 859 860 861
      ++pReadH->smaFsIter.iter;
      break;
    }
    ++pReadH->smaFsIter.iter;
  }

  if (pReadH->pDFile != NULL) {
    tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
    return true;
  }
C
Cary Xu 已提交
862
#endif
C
Cary Xu 已提交
863 864 865 866 867

  return false;
}

/**
C
Cary Xu 已提交
868
 * @brief
C
Cary Xu 已提交
869
 *
C
Cary Xu 已提交
870
 * @param pTsdb Return the data between queryWin and fill the pData.
C
Cary Xu 已提交
871
 * @param pData
C
Cary Xu 已提交
872 873 874 875 876 877
 * @param indexUid
 * @param interval
 * @param intervalUnit
 * @param tableUid
 * @param colId
 * @param pQuerySKey
C
Cary Xu 已提交
878 879 880
 * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
 * @return int32_t
 */
C
Cary Xu 已提交
881
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
882
                                   int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
C
Cary Xu 已提交
883
                                   int32_t nMaxResult) {
C
Cary Xu 已提交
884 885 886 887 888 889
  if (!pTsdb->pTSmaEnv) {
    terrno = TSDB_CODE_INVALID_PTR;
    tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
    return TSDB_CODE_FAILED;
  }

890 891
  tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
  SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
C
Cary Xu 已提交
892
  if (pItem == NULL) {
C
Cary Xu 已提交
893 894
    // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
    // it's NULL.
895
    tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
C
Cary Xu 已提交
896
    terrno = TSDB_CODE_TDB_INVALID_ACTION;
897
    tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid);
C
Cary Xu 已提交
898
    return TSDB_CODE_FAILED;
C
Cary Xu 已提交
899 900
  }

C
Cary Xu 已提交
901 902
#if 0
  int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
C
Cary Xu 已提交
903
  for (int32_t n = 0; n < nQueryWin; ++n) {
C
Cary Xu 已提交
904 905
    TSKEY skey = taosArrayGet(pQuerySKey, n);
    if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) {
C
Cary Xu 已提交
906 907 908
      // TODO: mark this window as expired.
    }
  }
C
Cary Xu 已提交
909
#endif
C
Cary Xu 已提交
910

911 912
#if 1
  if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
C
Cary Xu 已提交
913
    // TODO: mark this window as expired.
914 915 916 917 918
    tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
              querySKey, indexUid);
  } else {
    tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey,
              indexUid);
C
Cary Xu 已提交
919
  }
920 921
  tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));

C
Cary Xu 已提交
922
#endif
C
Cary Xu 已提交
923
  STSmaReadH tReadH = {0};
C
Cary Xu 已提交
924 925 926
  tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
  tsdbCloseDBF(&tReadH.dFile);

927
  tsdbInitTSmaFile(&tReadH, querySKey);
C
Cary Xu 已提交
928 929 930 931 932 933 934
  if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
    tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
    return TSDB_CODE_FAILED;
  }

  char  smaKey[SMA_KEY_LEN] = {0};
  void *pSmaKey = &smaKey;
935
  tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey);
C
Cary Xu 已提交
936

C
Cary Xu 已提交
937
  tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
C
Cary Xu 已提交
938 939
            tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
            *(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
C
Cary Xu 已提交
940

C
Cary Xu 已提交
941 942 943 944 945 946 947 948 949 950
  void *   result = NULL;
  uint32_t valueSize = 0;
  if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
    tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64
             " since %s",
             REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
             *(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno));
    tsdbCloseDBF(&tReadH.dFile);
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
951 952

#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
C
Cary Xu 已提交
953
  for (uint32_t v = 0; v < valueSize; v += 8) {
C
Cary Xu 已提交
954
    tsdbWarn("vgId:%d get sma data v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
C
Cary Xu 已提交
955 956
  }
#endif
C
Cary Xu 已提交
957 958
  tfree(result);  // TODO: fill the result to output

C
Cary Xu 已提交
959
#if 0
C
Cary Xu 已提交
960 961 962 963 964 965 966 967 968
  int32_t nResult = 0;
  int64_t lastKey = 0;

  while (true) {
    if (nResult >= nMaxResult) {
      break;
    }

    // set and open the file according to the STSma param
C
Cary Xu 已提交
969
    if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) {
C
Cary Xu 已提交
970 971 972 973 974 975 976 977 978 979 980
      char bTree[100] = "\0";
      while (strncmp(bTree, "has more nodes", 100) == 0) {
        if (nResult >= nMaxResult) {
          break;
        }
        // tsdbGetDataFromBTree(bTree, queryWin, lastKey)
        // fill the pData
        ++nResult;
      }
    }
  }
C
Cary Xu 已提交
981
#endif
C
Cary Xu 已提交
982
  // read data from file and fill the result
C
Cary Xu 已提交
983
  tsdbCloseDBF(&tReadH.dFile);
C
Cary Xu 已提交
984 985 986
  return TSDB_CODE_SUCCESS;
}

C
Cary Xu 已提交
987
#if 0
C
Cary Xu 已提交
988 989 990 991 992 993 994 995 996 997
/**
 * @brief Get the start TS key of the last data block of one interval/sliding.
 *
 * @param pTsdb
 * @param param
 * @param result
 * @return int32_t
 *         1) Return 0 and fill the result if the check procedure is normal;
 *         2) Return -1 if error occurs during the check procedure.
 */
C
Cary Xu 已提交
998
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) {
C
Cary Xu 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
  const char *procedure = "";
  if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
    return -1;
  }
  // fill the result
  return TSDB_CODE_SUCCESS;
}

/**
 * @brief Remove the tSma data files related to param between pWin.
 *
 * @param pTsdb
 * @param param
 * @param pWin
 * @return int32_t
 */
C
Cary Xu 已提交
1015
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
C
Cary Xu 已提交
1016 1017 1018 1019
  // for ("tSmaFiles of param-interval-sliding between pWin") {
  //   // remove the tSmaFile
  // }
  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1020
}
C
Cary Xu 已提交
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
#endif

/**
 * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
 *
 * @param pTsdb
 * @param param
 * @param msg
 * @return int32_t
 * TODO: Who is responsible for resource allocate and release?
 */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
    tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

/**
 * @brief Insert Time-range-wise Rollup Sma(RSma) data
 *
 * @param pTsdb
 * @param msg
 * @return int32_t
 */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
    tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}

int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
1064
                        tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
C
Cary Xu 已提交
1065
  int32_t code = TSDB_CODE_SUCCESS;
1066
  if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey,
C
Cary Xu 已提交
1067 1068 1069 1070 1071
                                  nMaxResult)) < 0) {
    tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
  }
  return code;
}