提交 abbd818f 编写于 作者: C Cary Xu

import SmaEnv/SmaStat to facilitate save data

上级 d2c59701
...@@ -353,6 +353,7 @@ int32_t* taosGetErrno(); ...@@ -353,6 +353,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617)
// query // query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......
...@@ -25,8 +25,16 @@ extern "C" { ...@@ -25,8 +25,16 @@ extern "C" {
typedef struct SDBFile SDBFile; typedef struct SDBFile SDBFile;
typedef DB_ENV* TDBEnv; typedef DB_ENV* TDBEnv;
struct SDBFile {
DB* pDB;
char* path;
};
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
void tsdbCloseDBF(SDBFile* pDBF); void tsdbCloseDBF(SDBFile* pDBF);
int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
void tsdbCloseBDBEnv(DB_ENV* pEnv);
int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "ttime.h" #include "ttime.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDBDef.h"
#include "tsdbCommit.h" #include "tsdbCommit.h"
#include "tsdbFS.h" #include "tsdbFS.h"
#include "tsdbFile.h" #include "tsdbFile.h"
...@@ -37,6 +38,7 @@ ...@@ -37,6 +38,7 @@
#include "tsdbReadImpl.h" #include "tsdbReadImpl.h"
#include "tsdbSma.h" #include "tsdbSma.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -56,7 +58,6 @@ struct STsdb { ...@@ -56,7 +58,6 @@ struct STsdb {
STfs * pTfs; STfs * pTfs;
SSmaEnv * pTSmaEnv; SSmaEnv * pTSmaEnv;
SSmaEnv * pRSmaEnv; SSmaEnv * pRSmaEnv;
// SSmaStat * pSmaStat;
}; };
#define REPO_ID(r) ((r)->vgId) #define REPO_ID(r) ((r)->vgId)
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnv SSmaEnv;
struct SSmaEnv { struct SSmaEnv {
pthread_rwlock_t lock; pthread_rwlock_t lock;
TDBEnv dbEnv;
char * path; char * path;
SSmaStat * pStat; SSmaStat * pStat;
}; };
......
...@@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { ...@@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
void *pBuf = NULL, *qBuf = NULL; void *pBuf = NULL, *qBuf = NULL;
DBT key1 = {0}, value1 = {0}; DBT key1 = {0}, value1 = {0};
{ // save sma info
// save sma info int32_t len = tEncodeTSma(NULL, pSmaCfg);
int32_t len = tEncodeTSma(NULL, pSmaCfg); pBuf = calloc(len, 1);
pBuf = calloc(len, 1); if (pBuf == NULL) {
if (pBuf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY; return -1;
return -1; }
}
key1.data = (void *)&pSmaCfg->indexUid; key1.data = (void *)&pSmaCfg->indexUid;
key1.size = sizeof(pSmaCfg->indexUid); key1.size = sizeof(pSmaCfg->indexUid);
qBuf = pBuf; qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg); tEncodeTSma(&qBuf, pSmaCfg);
value1.data = pBuf; value1.data = pBuf;
value1.size = POINTER_DISTANCE(qBuf, pBuf); value1.size = POINTER_DISTANCE(qBuf, pBuf);
value1.app_data = pSmaCfg; value1.app_data = pSmaCfg;
}
metaDBWLock(pMeta->pDB); metaDBWLock(pMeta->pDB);
pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0); pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
metaDBULock(pMeta->pDB); metaDBULock(pMeta->pDB);
// release
tfree(pBuf);
return 0; return 0;
} }
......
...@@ -16,30 +16,29 @@ ...@@ -16,30 +16,29 @@
#define ALLOW_FORBID_FUNC #define ALLOW_FORBID_FUNC
#include "db.h" #include "db.h"
#include "taoserror.h"
#include "tcoding.h" #include "tcoding.h"
#include "thash.h" #include "thash.h"
#include "tsdbDBDef.h" #include "tsdbDBDef.h"
#include "tsdbLog.h"
#define IMPL_WITH_LOCK 1 #define IMPL_WITH_LOCK 1
struct SDBFile { static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
DB * pDB; static void tsdbCloseBDBDb(DB *pDB);
char *path;
};
static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path); #define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code))
static void tsdbCloseBDBEnv(DB_ENV *pEnv);
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
static void tsdbCloseBDBDb(DB *pDB);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
// TDBEnv is shared by a group of SDBFile // TDBEnv is shared by a group of SDBFile
ASSERT(pEnv != NULL); if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
// Open DBF // Open DBF
if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) { if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
tsdbCloseBDBDb(pDBF->pDB); tsdbCloseBDBDb(pDBF->pDB);
return -1; return -1;
} }
...@@ -61,7 +60,7 @@ void tsdbCloseDBF(SDBFile *pDBF) { ...@@ -61,7 +60,7 @@ void tsdbCloseDBF(SDBFile *pDBF) {
} }
} }
static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
int ret = 0; int ret = 0;
DB_ENV *pEnv = NULL; DB_ENV *pEnv = NULL;
...@@ -75,7 +74,8 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { ...@@ -75,7 +74,8 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
if (ret != 0) { if (ret != 0) {
BDB_PERR("Failed to open tsdb env", ret); // BDB_PERR("Failed to open tsdb env", ret);
tsdbWarn("Failed to open tsdb env for path %s since %d", path ? path : "NULL", ret);
return -1; return -1;
} }
...@@ -84,7 +84,7 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { ...@@ -84,7 +84,7 @@ static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
return 0; return 0;
} }
static void tsdbCloseBDBEnv(DB_ENV *pEnv) { void tsdbCloseBDBEnv(DB_ENV *pEnv) {
if (pEnv) { if (pEnv) {
pEnv->close(pEnv, 0); pEnv->close(pEnv, 0);
} }
...@@ -123,4 +123,26 @@ static void tsdbCloseBDBDb(DB *pDB) { ...@@ -123,4 +123,26 @@ static void tsdbCloseBDBDb(DB *pDB) {
if (pDB) { if (pDB) {
pDB->close(pDB, 0); pDB->close(pDB, 0);
} }
}
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) {
int ret;
DBT key1 = {0}, value1 = {0};
key1.data = key;
key1.size = keySize;
value1.data = data;
value1.size = dataSize;
// TODO: lock
ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0);
if (ret) {
BDB_PERR("Failed to put data to DBF", ret);
// TODO: unlock
return -1;
}
// TODO: unlock
return 0;
} }
\ No newline at end of file
...@@ -24,16 +24,16 @@ ...@@ -24,16 +24,16 @@
#define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_STATE_ITEM_HASH_SLOT 32
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 123456 // TODO: just for test #define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
typedef enum { typedef enum {
SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
} ESmaStorageLevel; } ESmaStorageLevel;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
char * pDFile; // TODO: use the real DFile type, not char* SDBFile *pDFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
// TODO // TODO
} STSmaWriteH; } STSmaWriteH;
...@@ -74,10 +74,11 @@ static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); ...@@ -74,10 +74,11 @@ static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel); static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel);
...@@ -114,6 +115,11 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { ...@@ -114,6 +115,11 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
return NULL; return NULL;
} }
if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) {
tsdbFreeSmaEnv(pEnv);
return NULL;
}
return pEnv; return pEnv;
} }
...@@ -158,6 +164,7 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { ...@@ -158,6 +164,7 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
tfree(pSmaEnv->pStat); tfree(pSmaEnv->pStat);
tfree(pSmaEnv->path); tfree(pSmaEnv->path);
pthread_rwlock_destroy(&(pSmaEnv->lock)); pthread_rwlock_destroy(&(pSmaEnv->lock));
tsdbCloseBDBEnv(pSmaEnv->dbEnv);
} }
} }
...@@ -213,9 +220,9 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { ...@@ -213,9 +220,9 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
/** /**
* @brief Release resources allocated for its member fields, not including itself. * @brief Release resources allocated for its member fields, not including itself.
* *
* @param pSmaStat * @param pSmaStat
* @return int32_t * @return int32_t
*/ */
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
if (pSmaStat) { if (pSmaStat) {
...@@ -232,11 +239,11 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { ...@@ -232,11 +239,11 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
/** /**
* @brief Update expired window according to msg from stream computing module. * @brief Update expired window according to msg from stream computing module.
* *
* @param pTsdb * @param pTsdb
* @param smaType ETsdbSmaType * @param smaType ETsdbSmaType
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
STsdbCfg *pCfg = REPO_CFG(pTsdb); STsdbCfg *pCfg = REPO_CFG(pTsdb);
...@@ -247,21 +254,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { ...@@ -247,21 +254,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
pEnv = pTsdb->pTSmaEnv; pTsdb->pTSmaEnv = pEnv;
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
pEnv = pTsdb->pRSmaEnv; pTsdb->pRSmaEnv = pEnv;
} else { } else {
ASSERT(0); ASSERT(0);
} }
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
// TODO: decode the msg => start // TODO: decode the msg => start
int64_t indexUid = SMA_TEST_INDEX_UID; int64_t indexUid = SMA_TEST_INDEX_UID;
// const char * indexName = SMA_TEST_INDEX_NAME; // const char * indexName = SMA_TEST_INDEX_NAME;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
...@@ -285,8 +292,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { ...@@ -285,8 +292,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
// cache smaMeta // cache smaMeta
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
if (pSma == NULL) { if (pSma == NULL) {
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
free(pItem); free(pItem);
tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pItem->pSma = pSma; pItem->pSma = pSma;
...@@ -299,6 +309,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { ...@@ -299,6 +309,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
#if 0
SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size1 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1);
#endif
int8_t state = TSDB_SMA_STAT_EXPIRED; int8_t state = TSDB_SMA_STAT_EXPIRED;
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
...@@ -316,6 +331,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { ...@@ -316,6 +331,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
} }
} }
#if 0
SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size2 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2);
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -326,7 +347,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s ...@@ -326,7 +347,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s
if (pStat && pStat->smaStatItems) { if (pStat && pStat->smaStatItems) {
pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
} }
#if 0
if (pItem != NULL) { if (pItem != NULL) {
// TODO: reset time window for the sma data blocks // TODO: reset time window for the sma data blocks
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
...@@ -336,6 +357,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s ...@@ -336,6 +357,7 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s
} else { } else {
// error handling // error handling
} }
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -394,10 +416,17 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { ...@@ -394,10 +416,17 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
* @param dataLen * @param dataLen
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
SDBFile *pDBFile = pSmaH->pDFile;
// TODO: insert sma data blocks into B+Tree // TODO: insert sma data blocks into B+Tree
tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", tsdbDebug("insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", pDBFile->path,
*(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
if(tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0){
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -486,8 +515,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p ...@@ -486,8 +515,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
// TODO: check the data integrity // TODO: check the data integrity
void *bTree = pSmaH->pDFile;
int32_t len = 0; int32_t len = 0;
while (true) { while (true) {
if (len >= pData->dataLen) { if (len >= pData->dataLen) {
...@@ -510,7 +537,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p ...@@ -510,7 +537,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif #endif
tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey);
if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) { if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) {
tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
tbLen += (sizeof(STSmaColData) + pColData->blockSize); tbLen += (sizeof(STSmaColData) + pColData->blockSize);
...@@ -524,13 +551,28 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p ...@@ -524,13 +551,28 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
pSmaH->pDFile = (SDBFile *)calloc(1, sizeof(SDBFile *));
if (!pSmaH->pDFile) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
if (pSmaH) {
if (pSmaH->pDFile) {
tsdbCloseDBF(pSmaH->pDFile);
}
}
} }
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb; STsdb *pTsdb = pSmaH->pTsdb;
ASSERT(pSmaH->pDFile->path == NULL && pSmaH->pDFile->pDB == NULL);
pSmaH->pDFile = "tSma_interval_file_name"; char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
pSmaH->pDFile->path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -559,21 +601,25 @@ static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { ...@@ -559,21 +601,25 @@ static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) {
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); if (!pTsdb->pTSmaEnv) {
terrno = TSDB_CODE_INVALID_PTR;
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
return terrno;
}
if (pData->dataLen <= 0) { if (pData->dataLen <= 0) {
TASSERT(0); TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return TSDB_CODE_FAILED;
} }
if (!pTsdb->pTSmaEnv) { STSmaWriteH tSmaH = {0};
terrno = TSDB_CODE_INVALID_PTR;
return terrno; if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
return TSDB_CODE_FAILED;
} }
// Step 1: Judge the storage level and days // Step 1: Judge the storage level and days
...@@ -585,27 +631,41 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { ...@@ -585,27 +631,41 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
// - Set and open the DFile or the B+Tree file // - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit(); // TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, tSmaH.pDFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.pDFile->path ? tSmaH.pDFile->path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
tsdbInsertTSmaDataSection(&tSmaH, pData); if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED;
}
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();
// reset the SSmaStat // Step 3: reset the SSmaStat
tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
// TODO STsdb *pTsdb = pSmaH->pTsdb;
pSmaH->pDFile = "rSma_interval_file_name";
char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
pSmaH->pDFile->path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0}; STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
...@@ -627,6 +687,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { ...@@ -627,6 +687,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
// Save all the TSma data to one file // Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit(); // TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbInsertTSmaDataSection(&tSmaH, pData); tsdbInsertTSmaDataSection(&tSmaH, pData);
// TODO:tsdbEndTSmaCommit(); // TODO:tsdbEndTSmaCommit();
......
...@@ -54,7 +54,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { ...@@ -54,7 +54,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
} }
......
...@@ -223,14 +223,12 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -223,14 +223,12 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
TEST(testCase, tSmaInsertTest) { TEST(testCase, tSmaInsertTest) {
// prepare meta // prepare meta
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2";
const char * timezone = "Asia/Shanghai"; const char * timezone = "Asia/Shanghai";
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "I'm tags filter"; const char * tagsFilter = "I'm tags filter";
const char * smaTestDir = "./smaTest"; const char * smaTestDir = "./smaTest";
const tb_uid_t tbUid = 1234567890; const tb_uid_t tbUid = 1234567890;
const int64_t indexUid1 = 2000000001; const int64_t indexUid1 = 2000000001;
const int64_t indexUid2 = 2000000002;
const uint32_t nCntTSma = 2; const uint32_t nCntTSma = 2;
// encode // encode
STSma tSma = {0}; STSma tSma = {0};
...@@ -263,15 +261,20 @@ TEST(testCase, tSmaInsertTest) { ...@@ -263,15 +261,20 @@ TEST(testCase, tSmaInsertTest) {
// save index 1 // save index 1
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
// insert data // insert data
const int64_t indexUid = 2000000002;
STSmaDataWrapper *pSmaData = NULL; STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0}; STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config; STsdbCfg * pCfg = &tsdb.config;
pCfg->daysPerFile = 1;
tsdb.pMeta = pMeta; tsdb.pMeta = pMeta;
tsdb.vgId = 2;
tsdb.config.daysPerFile = 10; // default days is 10
tsdb.config.keep1 = 30;
tsdb.config.keep2 = 90;
tsdb.config.keep = 365;
tsdb.config.precision = TSDB_TIME_PRECISION_MILLI;
tsdb.config.update = TD_ROW_OVERWRITE_UPDATE;
tsdb.config.compression = TWO_STAGE_COMP;
char *msg = (char *)calloc(100, 1); char *msg = (char *)calloc(100, 1);
EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
...@@ -283,16 +286,16 @@ TEST(testCase, tSmaInsertTest) { ...@@ -283,16 +286,16 @@ TEST(testCase, tSmaInsertTest) {
void * buf = NULL; void * buf = NULL;
EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t bufSize = taosTSizeof(buf); int32_t bufSize = taosTSizeof(buf);
int32_t numOfTables = 25; int32_t numOfTables = 5;
col_id_t numOfCols = 4096; col_id_t numOfCols = 10;
EXPECT_GT(numOfCols, 0); EXPECT_GT(numOfCols, 0);
pSmaData = (STSmaDataWrapper *)buf; pSmaData = (STSmaDataWrapper *)buf;
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
pSmaData->skey = 1646987196; pSmaData->skey = 1646987196000;
pSmaData->interval = 10; pSmaData->interval = 10;
pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE;
pSmaData->indexUid = indexUid; pSmaData->indexUid = indexUid1;
int32_t len = sizeof(STSmaDataWrapper); int32_t len = sizeof(STSmaDataWrapper);
for (int32_t t = 0; t < numOfTables; ++t) { for (int32_t t = 0; t < numOfTables; ++t) {
......
...@@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") ...@@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册