Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e10f471b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e10f471b
编写于
3月 16, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor
上级
831a8522
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
184 addition
and
147 deletion
+184
-147
include/common/tmsg.h
include/common/tmsg.h
+5
-20
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+5
-4
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+107
-63
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+63
-58
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
e10f471b
...
...
@@ -1896,33 +1896,18 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
}
return
buf
;
}
typedef
enum
{
TD_TIME_UNIT_UNKNOWN
=
-
1
,
TD_TIME_UNIT_YEAR
=
0
,
TD_TIME_UNIT_SEASON
=
1
,
TD_TIME_UNIT_MONTH
=
2
,
TD_TIME_UNIT_WEEK
=
3
,
TD_TIME_UNIT_DAY
=
4
,
TD_TIME_UNIT_HOUR
=
5
,
TD_TIME_UNIT_MINUTE
=
6
,
TD_TIME_UNIT_SEC
=
7
,
TD_TIME_UNIT_MILLISEC
=
8
,
TD_TIME_UNIT_MICROSEC
=
9
,
TD_TIME_UNIT_NANOSEC
=
10
}
ETDTimeUnit
;
typedef
struct
{
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
// sma data
is invalid if timezone change
.
char
timezone
[
TD_TIMEZONE_LEN
];
// sma data
expired if timezone changes
.
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
tb_uid_t
tableUid
;
// super/child/common table uid
int64_t
interval
;
int64_t
offset
;
int64_t
offset
;
// use unit by precision of DB
int64_t
sliding
;
char
*
expr
;
// sma expression
char
*
tagsFilter
;
...
...
@@ -1967,7 +1952,7 @@ typedef struct {
typedef
struct
{
int64_t
indexUid
;
TSKEY
skey
;
// start
TS of one interval/sliding
TSKEY
skey
;
// start
Key of one interval/sliding window
int64_t
interval
;
int32_t
dataLen
;
// not including head
int8_t
intervalUnit
;
...
...
include/util/taoserror.h
浏览文件 @
e10f471b
...
...
@@ -354,6 +354,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
...
...
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
e10f471b
...
...
@@ -60,10 +60,11 @@ struct STsdb {
SSmaEnv
*
pRSmaEnv
;
};
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv)
int
tsdbLockRepo
(
STsdb
*
pTsdb
);
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
);
...
...
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
浏览文件 @
e10f471b
...
...
@@ -68,8 +68,8 @@ int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
ret
=
pEnv
->
open
(
pEnv
,
path
,
DB_CREATE
|
DB_INIT_CDB
|
DB_INIT_MPOOL
,
0
);
if
(
ret
!=
0
)
{
// BDB_PERR("Failed to open tsdb env", ret)
;
tsdbWarn
(
"Failed to open tsdb env for path %s since
%d
"
,
path
?
path
:
"NULL"
,
ret
);
terrno
=
TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR
;
tsdbWarn
(
"Failed to open tsdb env for path %s since
ret %d != 0
"
,
path
?
path
:
"NULL"
,
ret
);
return
-
1
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
e10f471b
...
...
@@ -17,7 +17,7 @@
#undef SMA_PRINT_DEBUG_LOG
#define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_TSDB_TIMES
3
0
#define SMA_STORAGE_TSDB_TIMES
1
0
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
...
...
@@ -93,6 +93,11 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
storageLevel
,
int32_t
fid
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pSmaH
,
TSKEY
skey
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
TSKEY
*
queryKey
);
static
void
tsdbGetSmaDir
(
int32_t
repoid
,
int8_t
smaType
,
char
dirName
[]);
static
void
tsdbGetSmaDir
(
int32_t
repoid
,
int8_t
smaType
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/tsdb/data"
,
repoid
);
}
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
)
{
SSmaEnv
*
pEnv
=
NULL
;
...
...
@@ -136,7 +141,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
return
TSDB_CODE_FAILED
;
}
if
(
pEnv
&&
*
pEnv
)
{
if
(
*
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -144,7 +149,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
return
TSDB_CODE_FAILED
;
}
if
(
*
pEnv
==
NULL
)
{
if
(
*
pEnv
==
NULL
)
{
// 2nd phase check
if
((
*
pEnv
=
tsdbNewSmaEnv
(
pTsdb
,
path
))
==
NULL
)
{
tsdbUnlockRepo
(
pTsdb
);
return
TSDB_CODE_FAILED
;
...
...
@@ -152,7 +157,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
}
if
(
tsdbUnlockRepo
(
pTsdb
)
!=
0
)
{
tsdbFreeSmaEnv
(
*
pEnv
);
*
pEnv
=
tsdbFreeSmaEnv
(
*
pEnv
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -244,6 +249,39 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
}
}
static
int32_t
tsdbCheckAndInitSmaEnv
(
STsdb
*
pTsdb
,
int8_t
smaType
)
{
switch
(
smaType
)
{
case
TSDB_SMA_TYPE_TIME_RANGE
:
if
(
pTsdb
->
pTSmaEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
break
;
case
TSDB_SMA_TYPE_ROLLUP
:
if
(
pTsdb
->
pRSmaEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
break
;
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_FAILED
;
}
// SDiskID did = {0};
SSmaEnv
*
pEnv
=
NULL
;
char
smaPath
[
TSDB_FILENAME_LEN
]
=
"/proj/.sma/"
;
if
(
tsdbInitSmaEnv
(
pTsdb
,
smaPath
,
&
pEnv
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
pTsdb
->
pTSmaEnv
=
pEnv
;
}
else
{
pTsdb
->
pRSmaEnv
=
pEnv
;
}
return
TSDB_CODE_SUCCESS
;
};
/**
* @brief Update expired window according to msg from stream computing module.
*
...
...
@@ -253,26 +291,17 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
SSmaEnv
*
pEnv
=
NULL
;
if
(
!
msg
||
!
pTsdb
->
pMeta
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
char
smaPath
[
TSDB_FILENAME_LEN
]
=
"/proj/.sma/"
;
if
(
tsdbInitSmaEnv
(
pTsdb
,
smaPath
,
&
pEnv
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
smaType
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
pTsdb
->
pTSmaEnv
=
pEnv
;
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
pTsdb
->
pRSmaEnv
=
pEnv
;
}
else
{
ASSERT
(
0
);
}
SSmaEnv
*
pEnv
=
REPO_SMA_ENV
(
pTsdb
,
smaType
);
// TODO: decode the msg => start
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
...
...
@@ -308,7 +337,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
}
pItem
->
pSma
=
pSma
;
// TODO: change indexName to indexUid
if
(
taosHashPut
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
),
&
pItem
,
sizeof
(
pItem
))
!=
0
)
{
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup
(
pItem
->
expiredWindows
);
...
...
@@ -378,32 +406,32 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s
static
int32_t
tsdbGetSmaStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
)
{
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
switch
(
intervalUnit
)
{
case
T
D_T
IME_UNIT_HOUR
:
case
TIME_UNIT_HOUR
:
if
(
interval
<
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
T
D_T
IME_UNIT_MINUTE
:
case
TIME_UNIT_MINUTE
:
if
(
interval
<
60
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
T
D_TIME_UNIT_SEC
:
case
T
IME_UNIT_SECOND
:
if
(
interval
<
3600
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
T
D_TIME_UNIT_MILLISEC
:
case
T
IME_UNIT_MILLISECOND
:
if
(
interval
<
3600
*
1e3
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
T
D_TIME_UNIT_MICROSEC
:
case
T
IME_UNIT_MICROSECOND
:
if
(
interval
<
3600
*
1e6
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
T
D_TIME_UNIT_NANOSEC
:
case
T
IME_UNIT_NANOSECOND
:
if
(
interval
<
3600
*
1e9
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
...
...
@@ -429,8 +457,8 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
// TODO: insert sma data blocks into B+Tree
tsdbDebug
(
"vgId:%d insert sma data blocks into %s: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", dataLen %d"
,
REPO_ID
(
pSmaH
->
pTsdb
),
pDBFile
->
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
REPO_ID
(
pSmaH
->
pTsdb
),
pDBFile
->
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
if
(
tsdbSaveSmaToDB
(
pDBFile
,
smaKey
,
keyLen
,
pData
,
dataLen
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
...
...
@@ -447,66 +475,73 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Approximate value for week/month/year.
*
* @param interval
* @param intervalUnit
* @param precision
* @return int64_t
*/
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
)
{
if
(
intervalUnit
<
TD_TIME_UNIT_MILLISEC
)
{
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_YEAR
:
case
TD_TIME_UNIT_SEASON
:
case
TD_TIME_UNIT_MONTH
:
case
TD_TIME_UNIT_WEEK
:
// illegal time unit
tsdbError
(
"invalid interval unit: %d
\n
"
,
intervalUnit
);
TASSERT
(
0
);
break
;
case
TD_TIME_UNIT_DAY
:
// the interval for tSma calculation must <= day
interval
*=
86400
*
1e3
;
break
;
case
TD_TIME_UNIT_HOUR
:
interval
*=
3600
*
1e3
;
break
;
case
TD_TIME_UNIT_MINUTE
:
interval
*=
60
*
1e3
;
break
;
case
TD_TIME_UNIT_SEC
:
interval
*=
1e3
;
break
;
default:
break
;
}
switch
(
intervalUnit
)
{
case
TIME_UNIT_YEAR
:
// approximate value
interval
*=
365
*
86400
*
1e3
;
break
;
case
TIME_UNIT_MONTH
:
// approximate value
interval
*=
30
*
86400
*
1e3
;
break
;
case
TIME_UNIT_WEEK
:
// approximate value
interval
*=
7
*
86400
*
1e3
;
break
;
case
TIME_UNIT_DAY
:
// the interval for tSma calculation must <= day
interval
*=
86400
*
1e3
;
break
;
case
TIME_UNIT_HOUR
:
interval
*=
3600
*
1e3
;
break
;
case
TIME_UNIT_MINUTE
:
interval
*=
60
*
1e3
;
break
;
case
TIME_UNIT_SECOND
:
interval
*=
1e3
;
break
;
default:
break
;
}
switch
(
precision
)
{
case
TSDB_TIME_PRECISION_MILLI
:
if
(
T
D_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
if
(
T
IME_UNIT_MICROSECOND
==
intervalUnit
)
{
// us
return
interval
/
1e3
;
}
else
if
(
T
D_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
}
else
if
(
T
IME_UNIT_NANOSECOND
==
intervalUnit
)
{
// nano second
return
interval
/
1e6
;
}
else
{
return
interval
;
}
break
;
case
TSDB_TIME_PRECISION_MICRO
:
if
(
T
D_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
if
(
T
IME_UNIT_MICROSECOND
==
intervalUnit
)
{
// us
return
interval
;
}
else
if
(
T
D_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
}
else
if
(
T
IME_UNIT_NANOSECOND
==
intervalUnit
)
{
// nano second
return
interval
/
1e3
;
}
else
{
return
interval
*
1e3
;
}
break
;
case
TSDB_TIME_PRECISION_NANO
:
if
(
T
D_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
if
(
T
IME_UNIT_MICROSECOND
==
intervalUnit
)
{
return
interval
*
1e3
;
}
else
if
(
T
D_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
}
else
if
(
T
IME_UNIT_NANOSECOND
==
intervalUnit
)
{
// nano second
return
interval
;
}
else
{
return
interval
*
1e6
;
}
break
;
default:
// ms
if
(
T
D_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
if
(
T
IME_UNIT_MICROSECOND
==
intervalUnit
)
{
// us
return
interval
/
1e3
;
}
else
if
(
T
D_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
}
else
if
(
T
IME_UNIT_NANOSECOND
==
intervalUnit
)
{
// nano second
return
interval
/
1e6
;
}
else
{
return
interval
;
...
...
@@ -800,10 +835,19 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySkey
,
int32_t
nMaxResult
)
{
if
(
!
pTsdb
->
pTSmaEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_FAILED
;
}
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pTsdb
->
pTSmaEnv
),
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
// mark all window as expired and notify query module to query raw TS data.
return
TSDB_CODE_SUCCESS
;
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
// it's NULL.
terrno
=
TSDB_CODE_TDB_INVALID_ACTION
;
tsdbWarn
(
"vgId:%d getTSmaDataImpl failed since no index %"
PRIi64
" in local cache"
,
REPO_ID
(
pTsdb
),
indexUid
);
return
TSDB_CODE_FAILED
;
}
#if 0
...
...
@@ -815,6 +859,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
}
}
#endif
#if 0
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired.
...
...
@@ -835,8 +880,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
tsdbEncodeTSmaKey
(
tableUid
,
colId
,
querySkey
,
(
void
**
)
&
pSmaKey
);
tsdbDebug
(
"vgId:%d get sma data from %s: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", keyLen %d"
,
REPO_ID
(
pTsdb
),
tReadH
.
dFile
.
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
SMA_KEY_LEN
);
tReadH
.
dFile
.
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
SMA_KEY_LEN
);
void
*
result
=
NULL
;
uint32_t
valueSize
=
0
;
...
...
@@ -947,7 +992,6 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
e10f471b
...
...
@@ -37,9 +37,9 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
tSma
.
intervalUnit
=
T
D_T
IME_UNIT_DAY
;
tSma
.
intervalUnit
=
TIME_UNIT_DAY
;
tSma
.
interval
=
1
;
tSma
.
slidingUnit
=
T
D_T
IME_UNIT_HOUR
;
tSma
.
slidingUnit
=
TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tstrncpy
(
tSma
.
indexName
,
"sma_index_test"
,
TSDB_INDEX_NAME_LEN
);
tstrncpy
(
tSma
.
timezone
,
"Asia/Shanghai"
,
TD_TIMEZONE_LEN
);
...
...
@@ -50,37 +50,37 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
uint32_t
bufLen
=
tEncodeTSmaWrapper
(
NULL
,
&
tSmaWrapper
);
void
*
buf
=
calloc
(
bufLen
,
1
);
assert
(
buf
!=
NULL
);
ASSERT_NE
(
buf
,
nullptr
);
STSmaWrapper
*
pSW
=
(
STSmaWrapper
*
)
buf
;
uint32_t
len
=
tEncodeTSmaWrapper
(
&
buf
,
&
tSmaWrapper
);
EXPEC
T_EQ
(
len
,
bufLen
);
ASSER
T_EQ
(
len
,
bufLen
);
// decode
STSmaWrapper
dstTSmaWrapper
=
{
0
};
void
*
result
=
tDecodeTSmaWrapper
(
pSW
,
&
dstTSmaWrapper
);
assert
(
result
!=
NULL
);
ASSERT_NE
(
result
,
nullptr
);
EXPEC
T_EQ
(
tSmaWrapper
.
number
,
dstTSmaWrapper
.
number
);
ASSER
T_EQ
(
tSmaWrapper
.
number
,
dstTSmaWrapper
.
number
);
for
(
int
i
=
0
;
i
<
tSmaWrapper
.
number
;
++
i
)
{
STSma
*
pSma
=
tSmaWrapper
.
tSma
+
i
;
STSma
*
qSma
=
dstTSmaWrapper
.
tSma
+
i
;
EXPEC
T_EQ
(
pSma
->
version
,
qSma
->
version
);
EXPEC
T_EQ
(
pSma
->
intervalUnit
,
qSma
->
intervalUnit
);
EXPEC
T_EQ
(
pSma
->
slidingUnit
,
qSma
->
slidingUnit
);
EXPEC
T_STRCASEEQ
(
pSma
->
indexName
,
qSma
->
indexName
);
EXPEC
T_STRCASEEQ
(
pSma
->
timezone
,
qSma
->
timezone
);
EXPEC
T_EQ
(
pSma
->
indexUid
,
qSma
->
indexUid
);
EXPEC
T_EQ
(
pSma
->
tableUid
,
qSma
->
tableUid
);
EXPEC
T_EQ
(
pSma
->
interval
,
qSma
->
interval
);
EXPEC
T_EQ
(
pSma
->
sliding
,
qSma
->
sliding
);
EXPEC
T_EQ
(
pSma
->
exprLen
,
qSma
->
exprLen
);
EXPEC
T_STRCASEEQ
(
pSma
->
expr
,
qSma
->
expr
);
EXPEC
T_EQ
(
pSma
->
tagsFilterLen
,
qSma
->
tagsFilterLen
);
EXPEC
T_STRCASEEQ
(
pSma
->
tagsFilter
,
qSma
->
tagsFilter
);
ASSER
T_EQ
(
pSma
->
version
,
qSma
->
version
);
ASSER
T_EQ
(
pSma
->
intervalUnit
,
qSma
->
intervalUnit
);
ASSER
T_EQ
(
pSma
->
slidingUnit
,
qSma
->
slidingUnit
);
ASSER
T_STRCASEEQ
(
pSma
->
indexName
,
qSma
->
indexName
);
ASSER
T_STRCASEEQ
(
pSma
->
timezone
,
qSma
->
timezone
);
ASSER
T_EQ
(
pSma
->
indexUid
,
qSma
->
indexUid
);
ASSER
T_EQ
(
pSma
->
tableUid
,
qSma
->
tableUid
);
ASSER
T_EQ
(
pSma
->
interval
,
qSma
->
interval
);
ASSER
T_EQ
(
pSma
->
sliding
,
qSma
->
sliding
);
ASSER
T_EQ
(
pSma
->
exprLen
,
qSma
->
exprLen
);
ASSER
T_STRCASEEQ
(
pSma
->
expr
,
qSma
->
expr
);
ASSER
T_EQ
(
pSma
->
tagsFilterLen
,
qSma
->
tagsFilterLen
);
ASSER
T_STRCASEEQ
(
pSma
->
tagsFilter
,
qSma
->
tagsFilter
);
}
// resource release
...
...
@@ -103,9 +103,9 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
tSma
.
intervalUnit
=
T
D_T
IME_UNIT_DAY
;
tSma
.
intervalUnit
=
TIME_UNIT_DAY
;
tSma
.
interval
=
1
;
tSma
.
slidingUnit
=
T
D_T
IME_UNIT_HOUR
;
tSma
.
slidingUnit
=
TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
...
...
@@ -114,10 +114,12 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tSma
.
exprLen
=
strlen
(
expr
);
tSma
.
expr
=
(
char
*
)
calloc
(
tSma
.
exprLen
+
1
,
1
);
ASSERT_NE
(
tSma
.
expr
,
nullptr
);
tstrncpy
(
tSma
.
expr
,
expr
,
tSma
.
exprLen
+
1
);
tSma
.
tagsFilterLen
=
strlen
(
tagsFilter
);
tSma
.
tagsFilter
=
(
char
*
)
calloc
(
tSma
.
tagsFilterLen
+
1
,
1
);
ASSERT_NE
(
tSma
.
tagsFilter
,
nullptr
);
tstrncpy
(
tSma
.
tagsFilter
,
tagsFilter
,
tSma
.
tagsFilterLen
+
1
);
SMeta
*
pMeta
=
NULL
;
...
...
@@ -129,18 +131,18 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
pMeta
=
metaOpen
(
smaTestDir
,
pMetaCfg
,
NULL
);
assert
(
pMeta
!=
NULL
);
// save index 1
EXPEC
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
ASSER
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
pSmaCfg
->
indexUid
=
indexUid2
;
tstrncpy
(
pSmaCfg
->
indexName
,
smaIndexName2
,
TSDB_INDEX_NAME_LEN
);
pSmaCfg
->
version
=
1
;
pSmaCfg
->
intervalUnit
=
T
D_T
IME_UNIT_HOUR
;
pSmaCfg
->
intervalUnit
=
TIME_UNIT_HOUR
;
pSmaCfg
->
interval
=
1
;
pSmaCfg
->
slidingUnit
=
T
D_T
IME_UNIT_MINUTE
;
pSmaCfg
->
slidingUnit
=
TIME_UNIT_MINUTE
;
pSmaCfg
->
sliding
=
5
;
// save index 2
EXPEC
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
ASSER
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
// get value by indexName
STSma
*
qSmaCfg
=
NULL
;
...
...
@@ -150,8 +152,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf
(
"timezone1 = %s
\n
"
,
qSmaCfg
->
timezone
);
printf
(
"expr1 = %s
\n
"
,
qSmaCfg
->
expr
!=
NULL
?
qSmaCfg
->
expr
:
""
);
printf
(
"tagsFilter1 = %s
\n
"
,
qSmaCfg
->
tagsFilter
!=
NULL
?
qSmaCfg
->
tagsFilter
:
""
);
EXPEC
T_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName1
);
EXPEC
T_EQ
(
qSmaCfg
->
tableUid
,
tSma
.
tableUid
);
ASSER
T_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName1
);
ASSER
T_EQ
(
qSmaCfg
->
tableUid
,
tSma
.
tableUid
);
tdDestroyTSma
(
qSmaCfg
);
tfree
(
qSmaCfg
);
...
...
@@ -161,8 +163,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf
(
"timezone2 = %s
\n
"
,
qSmaCfg
->
timezone
);
printf
(
"expr2 = %s
\n
"
,
qSmaCfg
->
expr
!=
NULL
?
qSmaCfg
->
expr
:
""
);
printf
(
"tagsFilter2 = %s
\n
"
,
qSmaCfg
->
tagsFilter
!=
NULL
?
qSmaCfg
->
tagsFilter
:
""
);
EXPEC
T_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName2
);
EXPEC
T_EQ
(
qSmaCfg
->
interval
,
tSma
.
interval
);
ASSER
T_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName2
);
ASSER
T_EQ
(
qSmaCfg
->
interval
,
tSma
.
interval
);
tdDestroyTSma
(
qSmaCfg
);
tfree
(
qSmaCfg
);
...
...
@@ -178,25 +180,25 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf
(
"indexName = %s
\n
"
,
indexName
);
++
indexCnt
;
}
EXPEC
T_EQ
(
indexCnt
,
nCntTSma
);
ASSER
T_EQ
(
indexCnt
,
nCntTSma
);
metaCloseSmaCurosr
(
pSmaCur
);
// get wrapper by table uid
STSmaWrapper
*
pSW
=
metaGetSmaInfoByTable
(
pMeta
,
tbUid
);
assert
(
pSW
!=
NULL
);
EXPEC
T_EQ
(
pSW
->
number
,
nCntTSma
);
EXPEC
T_STRCASEEQ
(
pSW
->
tSma
->
indexName
,
smaIndexName1
);
EXPEC
T_STRCASEEQ
(
pSW
->
tSma
->
timezone
,
timezone
);
EXPEC
T_STRCASEEQ
(
pSW
->
tSma
->
expr
,
expr
);
EXPEC
T_STRCASEEQ
(
pSW
->
tSma
->
tagsFilter
,
tagsFilter
);
EXPEC
T_EQ
(
pSW
->
tSma
->
indexUid
,
indexUid1
);
EXPEC
T_EQ
(
pSW
->
tSma
->
tableUid
,
tbUid
);
EXPEC
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
indexName
,
smaIndexName2
);
EXPEC
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
timezone
,
timezone
);
EXPEC
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
expr
,
expr
);
EXPEC
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
tagsFilter
,
tagsFilter
);
EXPEC
T_EQ
((
pSW
->
tSma
+
1
)
->
indexUid
,
indexUid2
);
EXPEC
T_EQ
((
pSW
->
tSma
+
1
)
->
tableUid
,
tbUid
);
ASSER
T_EQ
(
pSW
->
number
,
nCntTSma
);
ASSER
T_STRCASEEQ
(
pSW
->
tSma
->
indexName
,
smaIndexName1
);
ASSER
T_STRCASEEQ
(
pSW
->
tSma
->
timezone
,
timezone
);
ASSER
T_STRCASEEQ
(
pSW
->
tSma
->
expr
,
expr
);
ASSER
T_STRCASEEQ
(
pSW
->
tSma
->
tagsFilter
,
tagsFilter
);
ASSER
T_EQ
(
pSW
->
tSma
->
indexUid
,
indexUid1
);
ASSER
T_EQ
(
pSW
->
tSma
->
tableUid
,
tbUid
);
ASSER
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
indexName
,
smaIndexName2
);
ASSER
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
timezone
,
timezone
);
ASSER
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
expr
,
expr
);
ASSER
T_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
tagsFilter
,
tagsFilter
);
ASSER
T_EQ
((
pSW
->
tSma
+
1
)
->
indexUid
,
indexUid2
);
ASSER
T_EQ
((
pSW
->
tSma
+
1
)
->
tableUid
,
tbUid
);
tdDestroyTSmaWrapper
(
pSW
);
tfree
(
pSW
);
...
...
@@ -208,7 +210,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf
(
"metaGetSmaTbUids: uid[%"
PRIu32
"] = %"
PRIi64
"
\n
"
,
i
,
*
(
tb_uid_t
*
)
taosArrayGet
(
pUids
,
i
));
// printf("metaGetSmaTbUids: index[%" PRIu32 "] = %s", i, (char *)taosArrayGet(pUids, i));
}
EXPEC
T_EQ
(
taosArrayGetSize
(
pUids
),
1
);
ASSER
T_EQ
(
taosArrayGetSize
(
pUids
),
1
);
taosArrayDestroy
(
pUids
);
// resource release
...
...
@@ -231,7 +233,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
const
tb_uid_t
tbUid
=
1234567890
;
const
int64_t
indexUid1
=
2000000001
;
const
int64_t
interval1
=
1
;
const
int8_t
intervalUnit1
=
T
D_T
IME_UNIT_DAY
;
const
int8_t
intervalUnit1
=
TIME_UNIT_DAY
;
const
uint32_t
nCntTSma
=
2
;
TSKEY
skey1
=
1646987196
;
const
int64_t
testSmaData1
=
100
;
...
...
@@ -239,9 +241,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
tSma
.
intervalUnit
=
T
D_T
IME_UNIT_DAY
;
tSma
.
intervalUnit
=
TIME_UNIT_DAY
;
tSma
.
interval
=
1
;
tSma
.
slidingUnit
=
T
D_T
IME_UNIT_HOUR
;
tSma
.
slidingUnit
=
TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
...
...
@@ -250,10 +252,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma
.
exprLen
=
strlen
(
expr
);
tSma
.
expr
=
(
char
*
)
calloc
(
tSma
.
exprLen
+
1
,
1
);
ASSERT_NE
(
tSma
.
expr
,
nullptr
);
tstrncpy
(
tSma
.
expr
,
expr
,
tSma
.
exprLen
+
1
);
tSma
.
tagsFilterLen
=
strlen
(
tagsFilter
);
tSma
.
tagsFilter
=
(
char
*
)
calloc
(
tSma
.
tagsFilterLen
+
1
,
1
);
ASSERT_NE
(
tSma
.
tagsFilter
,
nullptr
);
tstrncpy
(
tSma
.
tagsFilter
,
tagsFilter
,
tSma
.
tagsFilterLen
+
1
);
SMeta
*
pMeta
=
NULL
;
...
...
@@ -265,7 +269,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pMeta
=
metaOpen
(
smaTestDir
,
pMetaCfg
,
NULL
);
assert
(
pMeta
!=
NULL
);
// save index 1
EXPEC
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
ASSER
T_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
// step 2: insert data
STSmaDataWrapper
*
pSmaData
=
NULL
;
...
...
@@ -298,18 +302,19 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
}
char
*
msg
=
(
char
*
)
calloc
(
100
,
1
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
&
tsdb
,
TSDB_SMA_TYPE_TIME_RANGE
,
msg
),
0
);
assert
(
msg
!=
NULL
);
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
&
tsdb
,
TSDB_SMA_TYPE_TIME_RANGE
,
msg
),
0
);
// init
int32_t
allocCnt
=
0
;
int32_t
allocStep
=
40960
;
int32_t
buffer
=
4096
;
int32_t
allocStep
=
16384
;
int32_t
buffer
=
1024
;
void
*
buf
=
NULL
;
EXPEC
T_EQ
(
tsdbMakeRoom
(
&
buf
,
allocStep
),
0
);
ASSER
T_EQ
(
tsdbMakeRoom
(
&
buf
,
allocStep
),
0
);
int32_t
bufSize
=
taosTSizeof
(
buf
);
int32_t
numOfTables
=
10
;
col_id_t
numOfCols
=
4096
;
EXPEC
T_GT
(
numOfCols
,
0
);
ASSER
T_GT
(
numOfCols
,
0
);
pSmaData
=
(
STSmaDataWrapper
*
)
buf
;
printf
(
">> allocate [%d] time to %d and addr is %p
\n
"
,
++
allocCnt
,
bufSize
,
pSmaData
);
...
...
@@ -326,7 +331,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
int32_t
tableDataLen
=
sizeof
(
STSmaTbData
);
for
(
col_id_t
c
=
0
;
c
<
numOfCols
;
++
c
)
{
if
(
bufSize
-
len
-
tableDataLen
<
buffer
)
{
EXPEC
T_EQ
(
tsdbMakeRoom
(
&
buf
,
bufSize
+
allocStep
),
0
);
ASSER
T_EQ
(
tsdbMakeRoom
(
&
buf
,
bufSize
+
allocStep
),
0
);
pSmaData
=
(
STSmaDataWrapper
*
)
buf
;
pTbData
=
(
STSmaTbData
*
)
POINTER_SHIFT
(
pSmaData
,
len
);
bufSize
=
taosTSizeof
(
buf
);
...
...
@@ -353,22 +358,22 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
}
pSmaData
->
dataLen
=
(
len
-
sizeof
(
STSmaDataWrapper
));
EXPEC
T_GE
(
bufSize
,
pSmaData
->
dataLen
);
ASSER
T_GE
(
bufSize
,
pSmaData
->
dataLen
);
// execute
EXPEC
T_EQ
(
tsdbInsertTSmaData
(
&
tsdb
,
(
char
*
)
pSmaData
),
TSDB_CODE_SUCCESS
);
ASSER
T_EQ
(
tsdbInsertTSmaData
(
&
tsdb
,
(
char
*
)
pSmaData
),
TSDB_CODE_SUCCESS
);
// step 3: query
uint32_t
checkDataCnt
=
0
;
for
(
int32_t
t
=
0
;
t
<
numOfTables
;
++
t
)
{
for
(
col_id_t
c
=
0
;
c
<
numOfCols
;
++
c
)
{
EXPEC
T_EQ
(
tsdbGetTSmaData
(
&
tsdb
,
NULL
,
indexUid1
,
interval1
,
intervalUnit1
,
tbUid
+
t
,
ASSER
T_EQ
(
tsdbGetTSmaData
(
&
tsdb
,
NULL
,
indexUid1
,
interval1
,
intervalUnit1
,
tbUid
+
t
,
c
+
PRIMARYKEY_TIMESTAMP_COL_ID
,
skey1
,
1
),
TSDB_CODE_SUCCESS
);
++
checkDataCnt
;
}
}
printf
(
"%s:%d The sma data check count for insert and query is %"
PRIu32
"
\n
"
,
__FILE__
,
__LINE__
,
checkDataCnt
);
// release data
...
...
source/util/src/terror.c
浏览文件 @
e10f471b
...
...
@@ -350,6 +350,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
,
"No sma index in meta"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR
,
"TDB env open error"
)
// query
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_QHANDLE
,
"Invalid handle"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录