未验证 提交 1c135a69 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #10709 from taosdata/feature/TD-11463-3.0

Feature/td 11463 3.0
...@@ -1898,24 +1898,19 @@ typedef enum { ...@@ -1898,24 +1898,19 @@ typedef enum {
} ETDTimeUnit; } ETDTimeUnit;
typedef struct { typedef struct {
uint16_t funcId; int8_t version; // for compatibility(default 0)
uint16_t nColIds; int8_t intervalUnit;
col_id_t* colIds; // sorted colIds int8_t slidingUnit;
} SFuncColIds; char indexName[TSDB_INDEX_NAME_LEN];
char timezone[TD_TIMEZONE_LEN]; // sma data is invalid if timezone change.
typedef struct { uint16_t exprLen;
uint8_t version; // for compatibility uint16_t tagsFilterLen;
uint8_t intervalUnit; int64_t indexUid;
uint8_t slidingUnit; tb_uid_t tableUid; // super/child/common table uid
char indexName[TSDB_INDEX_NAME_LEN]; int64_t interval;
char timezone[TD_TIMEZONE_LEN]; int64_t sliding;
uint16_t nFuncColIds; char* expr; // sma expression
uint16_t tagsFilterLen; char* tagsFilter;
tb_uid_t tableUid; // super/common table uid
int64_t interval;
int64_t sliding;
SFuncColIds* funcColIds; // sorted funcIds
char* tagsFilter;
} STSma; // Time-range-wise SMA } STSma; // Time-range-wise SMA
typedef struct { typedef struct {
...@@ -1944,24 +1939,30 @@ int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq); ...@@ -1944,24 +1939,30 @@ int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq); void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
typedef struct { typedef struct {
STimeWindow tsWindow; // [skey, ekey] col_id_t colId;
uint64_t tableUid; // sub/common table uid uint16_t blockSize; // sma data block size
int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId char data[];
int32_t dataLen; // total data length } STSmaColData;
col_id_t* colIds; // e.g. 2,4,9,10
col_id_t numOfColIds; // e.g. 4 typedef struct {
char data[]; // the sma blocks tb_uid_t tableUid; // super/child/normal table uid
} STSmaData; int32_t dataLen; // not including head
char data[];
// TODO: move to the final location afte schema of STSma/STSmaData defined } STSmaTbData;
static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) {
if (pSmaData) { typedef struct {
if (pSmaData->colIds) { int64_t indexUid;
tfree(pSmaData->colIds); TSKEY skey; // startTS of one interval/sliding
} int64_t interval;
tfree(pSmaData); int32_t dataLen; // not including head
} int8_t intervalUnit;
} char data[];
} STSmaDataWrapper; // sma data for a interval/sliding window
// interval/sliding => window
// => window->table->colId
// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记
// RSma: Rollup SMA // RSma: Rollup SMA
typedef struct { typedef struct {
...@@ -1984,13 +1985,7 @@ typedef struct { ...@@ -1984,13 +1985,7 @@ typedef struct {
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) { static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
if (pSma) { if (pSma) {
if (pSma->funcColIds != NULL) { tfree(pSma->expr);
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) {
tfree((pSma->funcColIds + i)->colIds);
}
tfree(pSma->funcColIds);
}
tfree(pSma->tagsFilter); tfree(pSma->tagsFilter);
} }
} }
...@@ -2009,24 +2004,20 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { ...@@ -2009,24 +2004,20 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedU8(buf, pSma->version); tlen += taosEncodeFixedI8(buf, pSma->version);
tlen += taosEncodeFixedU8(buf, pSma->intervalUnit); tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
tlen += taosEncodeFixedU8(buf, pSma->slidingUnit); tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
tlen += taosEncodeString(buf, pSma->indexName); tlen += taosEncodeString(buf, pSma->indexName);
tlen += taosEncodeString(buf, pSma->timezone); tlen += taosEncodeString(buf, pSma->timezone);
tlen += taosEncodeFixedU16(buf, pSma->nFuncColIds); tlen += taosEncodeFixedU16(buf, pSma->exprLen);
tlen += taosEncodeFixedU16(buf, pSma->tagsFilterLen); tlen += taosEncodeFixedU16(buf, pSma->tagsFilterLen);
tlen += taosEncodeFixedI64(buf, pSma->indexUid);
tlen += taosEncodeFixedI64(buf, pSma->tableUid); tlen += taosEncodeFixedI64(buf, pSma->tableUid);
tlen += taosEncodeFixedI64(buf, pSma->interval); tlen += taosEncodeFixedI64(buf, pSma->interval);
tlen += taosEncodeFixedI64(buf, pSma->sliding); tlen += taosEncodeFixedI64(buf, pSma->sliding);
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) { if (pSma->exprLen > 0) {
SFuncColIds* funcColIds = pSma->funcColIds + i; tlen += taosEncodeString(buf, pSma->expr);
tlen += taosEncodeFixedU16(buf, funcColIds->funcId);
tlen += taosEncodeFixedU16(buf, funcColIds->nColIds);
for (uint16_t j = 0; j < funcColIds->nColIds; ++j) {
tlen += taosEncodeFixedU16(buf, *(funcColIds->colIds + j));
}
} }
if (pSma->tagsFilterLen > 0) { if (pSma->tagsFilterLen > 0) {
...@@ -2047,43 +2038,30 @@ static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* p ...@@ -2047,43 +2038,30 @@ static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* p
} }
static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
buf = taosDecodeFixedU8(buf, &pSma->version); buf = taosDecodeFixedI8(buf, &pSma->version);
buf = taosDecodeFixedU8(buf, &pSma->intervalUnit); buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
buf = taosDecodeFixedU8(buf, &pSma->slidingUnit); buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
buf = taosDecodeStringTo(buf, pSma->indexName); buf = taosDecodeStringTo(buf, pSma->indexName);
buf = taosDecodeStringTo(buf, pSma->timezone); buf = taosDecodeStringTo(buf, pSma->timezone);
buf = taosDecodeFixedU16(buf, &pSma->nFuncColIds); buf = taosDecodeFixedU16(buf, &pSma->exprLen);
buf = taosDecodeFixedU16(buf, &pSma->tagsFilterLen); buf = taosDecodeFixedU16(buf, &pSma->tagsFilterLen);
buf = taosDecodeFixedI64(buf, &pSma->indexUid);
buf = taosDecodeFixedI64(buf, &pSma->tableUid); buf = taosDecodeFixedI64(buf, &pSma->tableUid);
buf = taosDecodeFixedI64(buf, &pSma->interval); buf = taosDecodeFixedI64(buf, &pSma->interval);
buf = taosDecodeFixedI64(buf, &pSma->sliding); buf = taosDecodeFixedI64(buf, &pSma->sliding);
if (pSma->nFuncColIds > 0) {
pSma->funcColIds = (SFuncColIds*)calloc(pSma->nFuncColIds, sizeof(SFuncColIds)); if (pSma->exprLen > 0) {
if (pSma->funcColIds == NULL) { pSma->expr = (char*)calloc(pSma->exprLen, 1);
if (pSma->expr != NULL) {
buf = taosDecodeStringTo(buf, pSma->expr);
} else {
tdDestroyTSma(pSma); tdDestroyTSma(pSma);
return NULL; return NULL;
} }
for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) {
SFuncColIds* funcColIds = pSma->funcColIds + i;
buf = taosDecodeFixedU16(buf, &funcColIds->funcId);
buf = taosDecodeFixedU16(buf, &funcColIds->nColIds);
if (funcColIds->nColIds > 0) {
funcColIds->colIds = (col_id_t*)calloc(funcColIds->nColIds, sizeof(col_id_t));
if (funcColIds->colIds != NULL) {
for (uint16_t j = 0; j < funcColIds->nColIds; ++j) {
buf = taosDecodeFixedU16(buf, funcColIds->colIds + j);
}
} else {
tdDestroyTSma(pSma);
return NULL;
}
} else {
funcColIds->colIds = NULL;
}
}
} else { } else {
pSma->funcColIds = NULL; pSma->expr = NULL;
} }
if (pSma->tagsFilterLen > 0) { if (pSma->tagsFilterLen > 0) {
......
...@@ -58,8 +58,8 @@ STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); ...@@ -58,8 +58,8 @@ STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); STSma * metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid);
STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); STSmaWrapper * metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup); SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
......
...@@ -89,24 +89,21 @@ int tsdbCommit(STsdb *pTsdb); ...@@ -89,24 +89,21 @@ int tsdbCommit(STsdb *pTsdb);
/** /**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
* *
* @param pTsdb * @param pTsdb
* @param param * @param msg
* @param pData * @return int32_t
* @return int32_t
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData); int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
/** /**
* @brief Insert RSma(Time-range-wise Rollup SMA) data. * @brief Insert RSma(Time-range-wise Rollup SMA) data.
* *
* @param pTsdb * @param pTsdb
* @param param * @param msg
* @param pData * @return int32_t
* @return int32_t
*/ */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// STsdbCfg // STsdbCfg
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
......
...@@ -42,17 +42,14 @@ typedef struct { ...@@ -42,17 +42,14 @@ typedef struct {
typedef struct { typedef struct {
STsdbFSMeta meta; // FS meta STsdbFSMeta meta; // FS meta
SArray * df; // data file array SArray * df; // data file array
SArray * sf; // sma data file array v2(t|r)1900.index_name_1
// SArray * v2t100.index_name
SArray * smaf; // sma data file array v2t1900.index_name
} SFSStatus; } SFSStatus;
/** /**
* @brief Directory structure of .tsma data files. * @brief Directory structure of .tsma data files.
* *
* root@cary /vnode2/tsdb $ tree .tsma/ * /vnode2/tsdb $ tree .sma/
* .tsma/ * .sma/
* ├── v2t100.index_name_1 * ├── v2t100.index_name_1
* ├── v2t101.index_name_1 * ├── v2t101.index_name_1
* ├── v2t102.index_name_1 * ├── v2t102.index_name_1
...@@ -66,7 +63,7 @@ typedef struct { ...@@ -66,7 +63,7 @@ typedef struct {
* 0 directories, 9 files * 0 directories, 9 files
*/ */
typedef struct { typedef struct {
pthread_rwlock_t lock; pthread_rwlock_t lock;
SFSStatus *cstatus; // current status SFSStatus *cstatus; // current status
......
...@@ -335,6 +335,17 @@ typedef struct { ...@@ -335,6 +335,17 @@ typedef struct {
SDFile files[TSDB_FILE_MAX]; SDFile files[TSDB_FILE_MAX];
} SDFileSet; } SDFileSet;
typedef struct {
int fid;
int8_t state;
uint8_t ver;
#if 0
SDFInfo info;
#endif
STfsFile f;
TdFilePtr pFile;
} SSFile; // files split by days with fid
#define TSDB_LATEST_FSET_VER 0 #define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid) #define TSDB_FSET_FID(s) ((s)->fid)
......
...@@ -19,26 +19,28 @@ ...@@ -19,26 +19,28 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
// insert/update interface // insert/update interface
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// query interface // query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle. // TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
// management interface // management interface
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg);
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); #endif
// internal func // internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
int32_t len = 0; int32_t len = 0;
len += taosEncodeFixedU64(pData, tableUid); len += taosEncodeFixedI64(pData, tableUid);
len += taosEncodeFixedU16(pData, colId); len += taosEncodeFixedU16(pData, colId);
len += taosEncodeFixedI64(pData, tsKey); len += taosEncodeFixedI64(pData, tsKey);
return len; return len;
......
...@@ -227,21 +227,27 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { ...@@ -227,21 +227,27 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
} }
int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
char buf[512] = {0}; // TODO: may overflow // char buf[512] = {0}; // TODO: may overflow
void *pBuf = NULL; void *pBuf = NULL, *qBuf = NULL;
DBT key1 = {0}, value1 = {0}; DBT key1 = {0}, value1 = {0};
{ {
// save sma info // save sma info
pBuf = buf; int32_t len = tEncodeTSma(NULL, pSmaCfg);
pBuf = calloc(len, 1);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
key1.data = pSmaCfg->indexName; key1.data = (void *)&pSmaCfg->indexUid;
key1.size = strlen(key1.data); key1.size = sizeof(pSmaCfg->indexUid);
tEncodeTSma(&pBuf, pSmaCfg); qBuf = pBuf;
tEncodeTSma(&qBuf, pSmaCfg);
value1.data = buf; value1.data = pBuf;
value1.size = POINTER_DISTANCE(pBuf, buf); value1.size = POINTER_DISTANCE(qBuf, pBuf);
value1.app_data = pSmaCfg; value1.app_data = pSmaCfg;
} }
...@@ -609,7 +615,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { ...@@ -609,7 +615,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return pTbCfg; return pTbCfg;
} }
STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma * pCfg = NULL; STSma * pCfg = NULL;
SMetaDB *pDB = pMeta->pDB; SMetaDB *pDB = pMeta->pDB;
DBT key = {0}; DBT key = {0};
...@@ -617,8 +623,8 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { ...@@ -617,8 +623,8 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
int ret; int ret;
// Set key/value // Set key/value
key.data = (void *)indexName; key.data = (void *)&indexUid;
key.size = strlen(indexName); key.size = sizeof(indexUid);
// Query // Query
metaDBRLock(pDB); metaDBRLock(pDB);
...@@ -634,7 +640,10 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { ...@@ -634,7 +640,10 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
return NULL; return NULL;
} }
tDecodeTSma(value.data, pCfg); if (tDecodeTSma(value.data, pCfg) == NULL) {
tfree(pCfg);
return NULL;
}
return pCfg; return pCfg;
} }
...@@ -871,7 +880,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) { ...@@ -871,7 +880,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
} }
} }
STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
STSmaWrapper *pSW = NULL; STSmaWrapper *pSW = NULL;
pSW = calloc(sizeof(*pSW), 1); pSW = calloc(sizeof(*pSW), 1);
......
...@@ -19,22 +19,20 @@ ...@@ -19,22 +19,20 @@
#define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
#define SMA_STATE_HASH_SLOT 4 #define SMA_STATE_HASH_SLOT 4
#define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_STATE_ITEM_HASH_SLOT 32
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 123456 // TODO: just for test
typedef enum { typedef enum {
SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
} ESmaStorageLevel; } ESmaStorageLevel;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
char * pDFile; // TODO: use the real DFile type, not char* char * pDFile; // TODO: use the real DFile type, not char*
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
int32_t blockSize; // size of SMA block item
// TODO // TODO
} STSmaWriteH; } STSmaWriteH;
...@@ -62,6 +60,7 @@ typedef struct { ...@@ -62,6 +60,7 @@ typedef struct {
*/ */
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
SHashObj *expiredWindows; // key: skey of time window, value: N/A SHashObj *expiredWindows; // key: skey of time window, value: N/A
STSma * pSma;
} SSmaStatItem; } SSmaStatItem;
struct SSmaStat { struct SSmaStat {
...@@ -69,20 +68,18 @@ struct SSmaStat { ...@@ -69,20 +68,18 @@ struct SSmaStat {
}; };
// declaration of static functions // declaration of static functions
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t sectionDataLen, int32_t nBlocks); static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen);
static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset,
int32_t fid, int32_t *nSmaBlocks);
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
int32_t fid);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
ASSERT(pSmaStat != NULL); ASSERT(pSmaStat != NULL);
...@@ -133,10 +130,10 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { ...@@ -133,10 +130,10 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL); SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL);
while (item != NULL) { while (item != NULL) {
tfree(item->pSma);
taosHashCleanup(item->expiredWindows); taosHashCleanup(item->expiredWindows);
item = taosHashIterate(pSmaStat->smaStatItems, item); item = taosHashIterate(pSmaStat->smaStatItems, item);
} }
taosHashCleanup(pSmaStat->smaStatItems); taosHashCleanup(pSmaStat->smaStatItems);
free(pSmaStat); free(pSmaStat);
} }
...@@ -154,9 +151,13 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { ...@@ -154,9 +151,13 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
tsdbInitSmaStat(&pTsdb->pSmaStat); // lazy mode // lazy mode
if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
// TODO: decode the msg => start // TODO: decode the msg => start
int64_t indexUid = SMA_TEST_INDEX_UID;
const char * indexName = SMA_TEST_INDEX_NAME; const char * indexName = SMA_TEST_INDEX_NAME;
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
...@@ -169,14 +170,24 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { ...@@ -169,14 +170,24 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems;
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName));
if (!pItem) { if (pItem == NULL) {
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
if (!pItem) { if (pItem == NULL) {
// Response to stream computing: OOM // Response to stream computing: OOM
// For query, if the indexName not found, the TSDB should tell query module to query raw TS data. // For query, if the indexName not found, the TSDB should tell query module to query raw TS data.
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
// cache smaMeta
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid);
if (pSma == NULL) {
taosHashCleanup(pItem->expiredWindows);
free(pItem);
return TSDB_CODE_FAILED;
}
pItem->pSma = pSma;
// TODO: change indexName to indexUid
if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) {
// If error occurs during put smaStatItem, free the resources of pItem // If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
...@@ -195,6 +206,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { ...@@ -195,6 +206,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired // 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table. // windows failed to put into hash table.
taosHashCleanup(pItem->expiredWindows); taosHashCleanup(pItem->expiredWindows);
tfree(pItem->pSma);
taosHashRemove(pItemsHash, indexName, sizeof(indexName)); taosHashRemove(pItemsHash, indexName, sizeof(indexName));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -203,19 +215,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { ...@@ -203,19 +215,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, const char *indexName, void *timeWindow) { static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) {
SSmaStatItem *pItem = NULL; SSmaStatItem *pItem = NULL;
if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) { if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) {
pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName)); pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid));
} }
if (pItem != NULL) { if (pItem != NULL) {
// TODO: reset time windows for the sma data blocks // TODO: reset time window for the sma data blocks
while (true) { if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
TSKEY thisWindow = 0; // error handling
taosHashRemove(pItem->expiredWindows, &thisWindow, sizeof(thisWindow));
} }
} else {
// error handling
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -277,7 +291,7 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { ...@@ -277,7 +291,7 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) {
*/ */
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) {
// TODO: insert sma data blocks into B+Tree // TODO: insert sma data blocks into B+Tree
printf("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d\n", tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
*(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -360,85 +374,60 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit ...@@ -360,85 +374,60 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
* @param nBlocks The nBlocks with the same fid since nOffset. * @param nBlocks The nBlocks with the same fid since nOffset.
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t nOffset, int32_t nBlocks) { static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData) {
STsdb *pTsdb = pSmaH->pTsdb; STsdb *pTsdb = pSmaH->pTsdb;
TASSERT(pData->colIds != NULL); tsdbDebug("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64, pData->indexUid, pData->skey);
tsdbDebug("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d", nOffset, nBlocks); // TODO: check the data integrity
printf("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d\n", nOffset, nBlocks);
void *bTree = pSmaH->pDFile;
int32_t colDataLen = pData->dataLen / pData->numOfColIds;
int32_t sectionDataLen = pSmaH->blockSize * nBlocks; int32_t len = 0;
while (true) {
for (col_id_t i = 0; i < pData->numOfColIds; ++i) { if (len >= pData->dataLen) {
// param: pointer of B+Tree, key, value, dataLen break;
void *bTree = pSmaH->pDFile;
#ifndef SMA_STORE_SINGLE_BLOCKS
// save tSma data blocks as a whole
char smaKey[SMA_KEY_LEN] = {0};
void *pSmaKey = &smaKey;
tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + nOffset * pSmaH->interval,
(void **)&pSmaKey);
if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + nOffset * pSmaH->blockSize, sectionDataLen) <
0) {
tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
#else assert(pData->dataLen > 0);
// save tSma data blocks separately STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pData->data, len);
for (int32_t n = 0; n < nBlocks; ++n) {
char smaKey[SMA_KEY_LEN] = {0}; int32_t tbLen = 0;
void *pSmaKey = &smaKey; while (true) {
tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + (nOffset + n) * pSmaH->interval, if (tbLen >= pTbData->dataLen) {
(void **)&pSmaKey); break;
if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + (nOffset + n) * pSmaH->blockSize, }
pSmaH->blockSize) < 0) { assert(pTbData->dataLen > 0);
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen);
char smaKey[SMA_KEY_LEN] = {0};
void * pSmaKey = &smaKey;
#if 0
printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif
tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey);
if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) {
tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
tbLen += (sizeof(STSmaColData) + pColData->blockSize);
} }
#endif len += (sizeof(STSmaTbData) + pTbData->dataLen);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
// pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t);
} }
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
int32_t fid) {
// TODO // TODO
pSmaH->pDFile = "tSma_interval_file_name"; pSmaH->pDFile = "tSma_interval_file_name";
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} /** }
* @brief Split the sma data blocks by fid.
*
* @param pSmaH
* @param param
* @param pData
* @param nOffset
* @param fid
* @param nSmaBlocks
* @return int32_t
*/
static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset,
int32_t fid, int32_t *nSmaBlocks) {
STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb);
// TODO: use binary search
for (int32_t n = nOffset + 1; n < pData->numOfBlocks; ++n) {
// TODO: The tsWindow.skey should use the precision of DB.
int32_t tFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey + pSmaH->interval * n, days, pCfg->precision));
if (tFid > fid) {
*nSmaBlocks = n - nOffset;
break;
}
}
return TSDB_CODE_SUCCESS;
}
/** /**
* @brief Insert/Update Time-range-wise SMA data. * @brief Insert/Update Time-range-wise SMA data.
...@@ -449,124 +438,81 @@ static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pD ...@@ -449,124 +438,81 @@ static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pD
* - The destination file of one data block for some interval is determined by its start TS key. * - The destination file of one data block for some interval is determined by its start TS key.
* *
* @param pTsdb * @param pTsdb
* @param param * @param msg
* @param pData
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData) { int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaData * curData = pData; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0}; STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, param, pData); tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
if (pData->numOfBlocks <= 0 || pData->numOfColIds <= 0 || pData->dataLen <= 0) { if (pData->dataLen <= 0) {
TASSERT(0); TASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
// Step 1: Judge the storage level // Step 1: Judge the storage level
int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file // - Set and open the DFile or the B+Tree file
int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, daysPerFile, pCfg->precision)); int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, daysPerFile, pCfg->precision));
if (minFid == maxFid) {
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, minFid);
tsdbInsertTSmaDataSection(&tSmaH, pData, 0, pData->numOfBlocks);
// TODO:tsdbEndTSmaCommit();
} else if (minFid < maxFid) {
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t tFid = minFid;
int32_t nOffset = 0;
int32_t nSmaBlocks = 0;
do {
tsdbTSmaDataSplit(&tSmaH, param, pData, daysPerFile, nOffset, tFid, &nSmaBlocks);
tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid);
if (tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, nSmaBlocks) < 0) {
return terrno;
}
++tFid;
nOffset += nSmaBlocks;
if (tFid == maxFid) { // Save all the TSma data to one file
tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); // TODO: tsdbStartTSmaCommit();
tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, pData->numOfBlocks - nOffset); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
break; tsdbInsertTSmaDataSection(&tSmaH, pData);
} // TODO:tsdbEndTSmaCommit();
} while (true);
// TODO:tsdbEndTSmaCommit();
} else {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
// reset the SSmaStat // reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, param->indexName, &pData->tsWindow); tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, SRSma *param, STSmaData *pData, int32_t fid) { static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) {
// TODO // TODO
pSmaH->pDFile = "rSma_interval_file_name"; pSmaH->pDFile = "rSma_interval_file_name";
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSma * tParam = &param->tsma; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaData * curData = pData;
STSmaWriteH tSmaH = {0}; STSmaWriteH tSmaH = {0};
tsdbInitTSmaWriteH(&tSmaH, pTsdb, tParam, pData); tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData);
int32_t nSmaBlocks = pData->numOfBlocks; if (pData->dataLen <= 0) {
int32_t colDataLen = pData->dataLen / nSmaBlocks;
// Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET
// TODO: Use the daysPerFile for rSma data, not for TS data.
// TODO: The lifecycle of rSma data should be processed like the TS data files.
int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, pCfg->daysPerFile, pCfg->precision));
int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, pCfg->daysPerFile, pCfg->precision));
if (minFid == maxFid) {
// Save all the TSma data to one file
tsdbSetRSmaDataFile(&tSmaH, param, pData, minFid);
// TODO: tsdbStartTSmaCommit();
tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks);
// TODO:tsdbEndTSmaCommit();
} else if (minFid < maxFid) {
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t tmpFid = 0;
int32_t step = 0;
for (int32_t n = 0; n < pData->numOfBlocks; ++n) {
}
tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks);
// TODO:tsdbEndTSmaCommit();
} else {
TASSERT(0); TASSERT(0);
return TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno;
} }
// Step 1: Judge the storage level
int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
tsdbInsertTSmaDataSection(&tSmaH, pData);
// TODO:tsdbEndTSmaCommit();
// reset the SSmaStat // reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, param->tsma.indexName, &pData->tsWindow); tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey);
// Step 4: finish
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -579,9 +525,9 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { ...@@ -579,9 +525,9 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) {
* @param pData * @param pData
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
// pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); // pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t);
} }
...@@ -593,8 +539,8 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, ...@@ -593,8 +539,8 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param,
* @param queryWin * @param queryWin
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit);
int32_t daysPerFile = int32_t daysPerFile =
storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile;
pReadH->storageLevel = storageLevel; pReadH->storageLevel = storageLevel;
...@@ -611,8 +557,8 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *q ...@@ -611,8 +557,8 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *q
* @return true * @return true
* @return false * @return false
*/ */
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
SArray *smaFs = pReadH->pTsdb->fs->cstatus->smaf; SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
int32_t nSmaFs = taosArrayGetSize(smaFs); int32_t nSmaFs = taosArrayGetSize(smaFs);
pReadH->pDFile = NULL; pReadH->pDFile = NULL;
...@@ -646,10 +592,9 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow ...@@ -646,10 +592,9 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t * @return int32_t
*/ */
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) {
const char *indexName = param->indexName; SSmaStatItem *pItem =
(SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid));
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName));
if (pItem == NULL) { if (pItem == NULL) {
// mark all window as expired and notify query module to query raw TS data. // mark all window as expired and notify query module to query raw TS data.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -664,9 +609,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW ...@@ -664,9 +609,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW
} }
STSmaReadH tReadH = {0}; STSmaReadH tReadH = {0};
tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); tsdbInitTSmaReadH(&tReadH, pTsdb, pData);
tsdbInitTSmaFile(&tReadH, param, queryWin); tsdbInitTSmaFile(&tReadH, queryWin);
int32_t nResult = 0; int32_t nResult = 0;
int64_t lastKey = 0; int64_t lastKey = 0;
...@@ -677,7 +622,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW ...@@ -677,7 +622,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW
} }
// set and open the file according to the STSma param // set and open the file according to the STSma param
if (tsdbSetAndOpenTSmaFile(&tReadH, param, queryWin)) { if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) {
char bTree[100] = "\0"; char bTree[100] = "\0";
while (strncmp(bTree, "has more nodes", 100) == 0) { while (strncmp(bTree, "has more nodes", 100) == 0) {
if (nResult >= nMaxResult) { if (nResult >= nMaxResult) {
...@@ -694,6 +639,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW ...@@ -694,6 +639,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
/** /**
* @brief Get the start TS key of the last data block of one interval/sliding. * @brief Get the start TS key of the last data block of one interval/sliding.
* *
...@@ -704,7 +650,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW ...@@ -704,7 +650,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW
* 1) Return 0 and fill the result if the check procedure is normal; * 1) Return 0 and fill the result if the check procedure is normal;
* 2) Return -1 if error occurs during the check procedure. * 2) Return -1 if error occurs during the check procedure.
*/ */
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) {
const char *procedure = ""; const char *procedure = "";
if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) { if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
return -1; return -1;
...@@ -721,9 +667,10 @@ int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { ...@@ -721,9 +667,10 @@ int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) {
* @param pWin * @param pWin
* @return int32_t * @return int32_t
*/ */
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin) { int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
// for ("tSmaFiles of param-interval-sliding between pWin") { // for ("tSmaFiles of param-interval-sliding between pWin") {
// // remove the tSmaFile // // remove the tSmaFile
// } // }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file #endif
\ No newline at end of file
...@@ -39,13 +39,13 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { ...@@ -39,13 +39,13 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
* *
* @param pTsdb * @param pTsdb
* @param param * @param param
* @param pData * @param msg
* @return int32_t * @return int32_t
* TODO: Who is responsible for resource allocate and release? * TODO: Who is responsible for resource allocate and release?
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) { if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
...@@ -56,12 +56,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { ...@@ -56,12 +56,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) {
* *
* @param pTsdb * @param pTsdb
* @param param * @param param
* @param pData * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) { int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) { if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
} }
return code; return code;
......
...@@ -43,20 +43,8 @@ TEST(testCase, tSmaEncodeDecodeTest) { ...@@ -43,20 +43,8 @@ TEST(testCase, tSmaEncodeDecodeTest) {
tSma.sliding = 0; tSma.sliding = 0;
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN); tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN); tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN);
tSma.indexUid = 2345678910;
tSma.tableUid = 1234567890; tSma.tableUid = 1234567890;
tSma.nFuncColIds = 5;
tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds));
ASSERT(tSma.funcColIds != NULL);
for (int32_t n = 0; n < tSma.nFuncColIds; ++n) {
SFuncColIds *funcColIds = tSma.funcColIds + n;
funcColIds->funcId = n;
funcColIds->nColIds = 10;
funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t));
ASSERT(funcColIds->colIds != NULL);
for (int32_t i = 0; i < funcColIds->nColIds; ++i) {
*(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
}
STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma}; STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma};
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper); uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
...@@ -85,35 +73,31 @@ TEST(testCase, tSmaEncodeDecodeTest) { ...@@ -85,35 +73,31 @@ TEST(testCase, tSmaEncodeDecodeTest) {
EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit); EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit);
EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName); EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName);
EXPECT_STRCASEEQ(pSma->timezone, qSma->timezone); EXPECT_STRCASEEQ(pSma->timezone, qSma->timezone);
EXPECT_EQ(pSma->nFuncColIds, qSma->nFuncColIds); EXPECT_EQ(pSma->indexUid, qSma->indexUid);
EXPECT_EQ(pSma->tableUid, qSma->tableUid); EXPECT_EQ(pSma->tableUid, qSma->tableUid);
EXPECT_EQ(pSma->interval, qSma->interval); EXPECT_EQ(pSma->interval, qSma->interval);
EXPECT_EQ(pSma->sliding, qSma->sliding); EXPECT_EQ(pSma->sliding, qSma->sliding);
EXPECT_EQ(pSma->exprLen, qSma->exprLen);
EXPECT_STRCASEEQ(pSma->expr, qSma->expr);
EXPECT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen); EXPECT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen);
EXPECT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter); EXPECT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter);
for (uint32_t j = 0; j < pSma->nFuncColIds; ++j) {
SFuncColIds *pFuncColIds = pSma->funcColIds + j;
SFuncColIds *qFuncColIds = qSma->funcColIds + j;
EXPECT_EQ(pFuncColIds->funcId, qFuncColIds->funcId);
EXPECT_EQ(pFuncColIds->nColIds, qFuncColIds->nColIds);
for (uint32_t k = 0; k < pFuncColIds->nColIds; ++k) {
EXPECT_EQ(*(pFuncColIds->colIds + k), *(qFuncColIds->colIds + k));
}
}
} }
// resource release // resource release
tdDestroyTSma(&tSma); tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper); tdDestroyTSmaWrapper(&dstTSmaWrapper);
} }
#if 1
TEST(testCase, tSma_DB_Put_Get_Del_Test) { TEST(testCase, tSma_DB_Put_Get_Del_Test) {
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2"; const char * smaIndexName2 = "sma_index_test_2";
const char * timeZone = "Asia/Shanghai"; const char * timezone = "Asia/Shanghai";
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "I'm tags filter"; const char * tagsFilter = "I'm tags filter";
const char * smaTestDir = "./smaTest"; const char * smaTestDir = "./smaTest";
const uint64_t tbUid = 1234567890; const tb_uid_t tbUid = 1234567890;
const int64_t indexUid1 = 2000000001;
const int64_t indexUid2 = 2000000002;
const uint32_t nCntTSma = 2; const uint32_t nCntTSma = 2;
// encode // encode
STSma tSma = {0}; STSma tSma = {0};
...@@ -122,22 +106,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -122,22 +106,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
tSma.interval = 1; tSma.interval = 1;
tSma.slidingUnit = TD_TIME_UNIT_HOUR; tSma.slidingUnit = TD_TIME_UNIT_HOUR;
tSma.sliding = 0; tSma.sliding = 0;
tSma.indexUid = indexUid1;
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
tstrncpy(tSma.timezone, timeZone, TD_TIMEZONE_LEN); tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN);
tSma.tableUid = tbUid; tSma.tableUid = tbUid;
tSma.nFuncColIds = 5;
tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds)); tSma.exprLen = strlen(expr);
ASSERT(tSma.funcColIds != NULL); tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
for (int32_t n = 0; n < tSma.nFuncColIds; ++n) { tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
SFuncColIds *funcColIds = tSma.funcColIds + n;
funcColIds->funcId = n;
funcColIds->nColIds = 10;
funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t));
ASSERT(funcColIds->colIds != NULL);
for (int32_t i = 0; i < funcColIds->nColIds; ++i) {
*(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
}
}
tSma.tagsFilterLen = strlen(tagsFilter); tSma.tagsFilterLen = strlen(tagsFilter);
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1); tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
...@@ -151,8 +128,9 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -151,8 +128,9 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL); pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
assert(pMeta != NULL); assert(pMeta != NULL);
// save index 1 // save index 1
metaSaveSmaToDB(pMeta, pSmaCfg); EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
pSmaCfg->indexUid = indexUid2;
tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN); tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN);
pSmaCfg->version = 1; pSmaCfg->version = 1;
pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR; pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR;
...@@ -161,24 +139,26 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -161,24 +139,26 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
pSmaCfg->sliding = 5; pSmaCfg->sliding = 5;
// save index 2 // save index 2
metaSaveSmaToDB(pMeta, pSmaCfg); EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
// get value by indexName // get value by indexName
STSma *qSmaCfg = NULL; STSma *qSmaCfg = NULL;
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1); qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1);
assert(qSmaCfg != NULL); assert(qSmaCfg != NULL);
printf("name1 = %s\n", qSmaCfg->indexName); printf("name1 = %s\n", qSmaCfg->indexName);
printf("timezone1 = %s\n", qSmaCfg->timezone); printf("timezone1 = %s\n", qSmaCfg->timezone);
printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid); EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
tdDestroyTSma(qSmaCfg); tdDestroyTSma(qSmaCfg);
tfree(qSmaCfg); tfree(qSmaCfg);
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2); qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2);
assert(qSmaCfg != NULL); assert(qSmaCfg != NULL);
printf("name2 = %s\n", qSmaCfg->indexName); printf("name2 = %s\n", qSmaCfg->indexName);
printf("timezone2 = %s\n", qSmaCfg->timezone); printf("timezone2 = %s\n", qSmaCfg->timezone);
printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
EXPECT_EQ(qSmaCfg->interval, tSma.interval); EXPECT_EQ(qSmaCfg->interval, tSma.interval);
...@@ -201,17 +181,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -201,17 +181,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
metaCloseSmaCurosr(pSmaCur); metaCloseSmaCurosr(pSmaCur);
// get wrapper by table uid // get wrapper by table uid
STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid);
assert(pSW != NULL); assert(pSW != NULL);
EXPECT_EQ(pSW->number, nCntTSma); EXPECT_EQ(pSW->number, nCntTSma);
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
EXPECT_STRCASEEQ(pSW->tSma->timezone, timeZone); EXPECT_STRCASEEQ(pSW->tSma->timezone, timezone);
EXPECT_STRCASEEQ(pSW->tSma->expr, expr);
EXPECT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter); EXPECT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter);
EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); EXPECT_EQ(pSW->tSma->indexUid, indexUid1);
EXPECT_EQ(pSW->tSma->tableUid, tbUid);
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timeZone); EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone);
EXPECT_STRCASEEQ((pSW->tSma + 1)->expr, expr);
EXPECT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter); EXPECT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid);
tdDestroyTSmaWrapper(pSW); tdDestroyTSmaWrapper(pSW);
tfree(pSW); tfree(pSW);
...@@ -233,44 +217,68 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { ...@@ -233,44 +217,68 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
tdDestroyTSma(&tSma); tdDestroyTSma(&tSma);
metaClose(pMeta); metaClose(pMeta);
} }
#endif
#if 0 #if 1
TEST(testCase, tSmaInsertTest) { TEST(testCase, tSmaInsertTest) {
STSma tSma = {0}; const int64_t indexUid = 2000000002;
STSmaData *pSmaData = NULL; STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0}; STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config;
pCfg->daysPerFile = 1;
// init // init
tSma.intervalUnit = TD_TIME_UNIT_DAY; int32_t allocCnt = 0;
tSma.interval = 1; int32_t allocStep = 40960;
tSma.numOfFuncIds = 5; // sum/min/max/avg/last int32_t buffer = 4096;
void * buf = NULL;
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t numOfColIds = 3; int32_t bufSize = taosTSizeof(buf);
int32_t numOfBlocks = 10; int32_t numOfTables = 25;
col_id_t numOfCols = 4096;
int32_t dataLen = numOfColIds * numOfBlocks * blockSize; EXPECT_GT(numOfCols, 0);
pSmaData = (STSmaData *)malloc(sizeof(STSmaData) + dataLen); pSmaData = (STSmaDataWrapper *)buf;
ASSERT_EQ(pSmaData != NULL, true); printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
pSmaData->tableUid = 3232329230; pSmaData->skey = 1646987196;
pSmaData->numOfColIds = numOfColIds; pSmaData->interval = 10;
pSmaData->numOfBlocks = numOfBlocks; pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE;
pSmaData->dataLen = dataLen; pSmaData->indexUid = indexUid;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649; int32_t len = sizeof(STSmaDataWrapper);
pSmaData->colIds = (col_id_t *)malloc(sizeof(col_id_t) * numOfColIds); for (int32_t t = 0; t < numOfTables; ++t) {
ASSERT_EQ(pSmaData->colIds != NULL, true); STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
pTbData->tableUid = t;
for (int32_t i = 0; i < numOfColIds; ++i) {
*(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); int32_t tableDataLen = sizeof(STSmaTbData);
for (col_id_t c = 0; c < numOfCols; ++c) {
if (bufSize - len - tableDataLen < buffer) {
EXPECT_EQ(tsdbMakeRoom(&buf, bufSize + allocStep), 0);
pSmaData = (STSmaDataWrapper *)buf;
pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
bufSize = taosTSizeof(buf);
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
}
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen);
pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID;
pColData->blockSize = ((c & 1) == 0) ? 8 : 16;
// TODO: fill col data
tableDataLen += (sizeof(STSmaColData) + pColData->blockSize);
}
pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData));
len += tableDataLen;
// printf("bufSize=%d, len=%d, len of table[%d]=%d\n", bufSize, len, t, tableDataLen);
} }
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
EXPECT_GE(bufSize, pSmaData->dataLen);
// execute // execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS); EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// release // release
tdDestroySmaData(pSmaData); taosTZfree(buf);
} }
#endif #endif
......
...@@ -265,7 +265,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { ...@@ -265,7 +265,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
return NULL; return NULL;
} }
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
pthread_rwlock_init(&(pFile->rwlock),NULL); pthread_rwlock_init(&(pFile->rwlock), NULL);
#endif #endif
pFile->fd = fd; pFile->fd = fd;
pFile->fp = fp; pFile->fp = fp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册