Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
585ae95d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
585ae95d
编写于
3月 25, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
expired window/tsma msg adaption
上级
78ee5356
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
222 addition
and
177 deletion
+222
-177
include/common/tmsg.h
include/common/tmsg.h
+9
-4
source/common/src/tmsg.c
source/common/src/tmsg.c
+1
-1
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+32
-2
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+1
-0
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+2
-0
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+165
-114
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+1
-45
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+11
-11
未找到文件。
include/common/tmsg.h
浏览文件 @
585ae95d
...
...
@@ -240,12 +240,12 @@ typedef struct {
}
SSubmitBlkIter
;
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
void
*
pMsg
;
int32_t
totalLen
;
int32_t
len
;
const
void
*
pMsg
;
}
SSubmitMsgIter
;
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
...
...
@@ -2100,6 +2100,11 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
}
}
static
FORCE_INLINE
void
tdFreeTSmaWrapper
(
STSmaWrapper
*
pSW
)
{
tdDestroyTSmaWrapper
(
pSW
);
tfree
(
pSW
);
}
static
FORCE_INLINE
int32_t
tEncodeTSma
(
void
**
buf
,
const
STSma
*
pSma
)
{
int32_t
tlen
=
0
;
...
...
source/common/src/tmsg.c
浏览文件 @
585ae95d
...
...
@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
-
1
;
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
585ae95d
...
...
@@ -87,6 +87,15 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
/**
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
const
char
*
msg
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
...
...
@@ -95,11 +104,18 @@ int tsdbCommit(STsdb *pTsdb);
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
);
/**
* @brief Drop tSma data and local cache.
*
* @param pTsdb
* @param indexUid
* @return int32_t
*/
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
/**
* @brief Insert RSma(
Time-range-wise
Rollup SMA) data.
* @brief Insert RSma(Rollup SMA) data.
*
* @param pTsdb
* @param msg
...
...
@@ -108,6 +124,20 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
/**
* @brief Get tSma(Time-range-wise SMA) data.
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
...
...
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
585ae95d
...
...
@@ -63,6 +63,7 @@ struct STsdb {
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs
#define REPO_META(r) (r)->pMeta
#define REPO_TFS(r) (r)->pTfs
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv)
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
585ae95d
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
#define TSDB_SMA_TEST // remove after test finished
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaEnv
SSmaEnv
;
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
585ae95d
...
...
@@ -38,7 +38,7 @@ typedef enum {
}
ESmaStorageLevel
;
typedef
struct
{
STsdb
*
pTsdb
;
STsdb
*
pTsdb
;
SDBFile
dFile
;
int32_t
interval
;
// interval with the precision of DB
}
STSmaWriteH
;
...
...
@@ -49,7 +49,7 @@ typedef struct {
}
SmaFsIter
;
typedef
struct
{
STsdb
*
pTsdb
;
STsdb
*
pTsdb
;
SDBFile
dFile
;
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
...
...
@@ -69,7 +69,7 @@ typedef struct {
*/
int8_t
state
;
// ETsdbSmaStat
SHashObj
*
expiredWindows
;
// key: skey of time window, value: N/A
STSma
*
pSma
;
// cache schema
STSma
*
pSma
;
// cache schema
}
SSmaStatItem
;
struct
SSmaStat
{
...
...
@@ -80,9 +80,9 @@ struct SSmaStat {
// declaration of static functions
// expired window
static
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
ETsdbSmaType
smaType
,
char
*
msg
);
static
int32_t
tsdbUpdateExpiredWindow
Impl
(
STsdb
*
pTsdb
,
const
char
*
msg
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
);
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
);
...
...
@@ -124,7 +124,7 @@ static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) {
}
static
FORCE_INLINE
bool
tsdbSmaStatIsOK
(
SSmaStatItem
*
pStatItem
,
int8_t
*
state
)
{
if
(
!
pStatItem
)
{
if
(
!
pStatItem
)
{
return
false
;
}
...
...
@@ -384,49 +384,20 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
/**
* @brief Update expired window according to msg from stream computing module.
*
* @param pTsdb
* @param smaType ETsdbSmaType
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
ETsdbSmaType
smaType
,
char
*
msg
)
{
if
(
!
msg
||
!
pTsdb
->
pMeta
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
// TODO: decode the msg from Stream Computing module => start
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
TSKEY
skey1
=
1646987196
*
1e3
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
expiredWindows
[
i
]
=
skey1
+
i
;
}
// TODO: decode the msg <= end
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
smaType
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
REPO_SMA_ENV
(
pTsdb
,
smaType
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
);
TASSERT
(
pEnv
!=
NULL
&&
pStat
!=
NULL
&&
pItemsHash
!=
NULL
);
static
STimeWindow
getActiveTimeWindowX
(
int64_t
ts
,
SInterval
*
pInterval
)
{
STimeWindow
tw
=
{
0
};
tw
.
skey
=
100
;
tw
.
ekey
=
1000
;
return
tw
;
}
tsdbRefSmaStat
(
pTsdb
,
pStat
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_EXPIRED
);
// TODO use the real state
if
(
pItem
==
NULL
)
{
// Response to stream computing: OOM
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -436,7 +407,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
terrno
=
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
;
taosHashCleanup
(
pItem
->
expiredWindows
);
free
(
pItem
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
tsdbWarn
(
"vgId:%d update expired window failed for smaIndex %"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
...
...
@@ -447,34 +417,150 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup
(
pItem
->
expiredWindows
);
free
(
pItem
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
}
}
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
winSKey
,
sizeof
(
TSKEY
),
&
state
,
sizeof
(
state
))
!=
0
)
{
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup
(
pItem
->
expiredWindows
);
tfree
(
pItem
->
pSma
);
taosHashRemove
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
return
TSDB_CODE_FAILED
;
}
tsdbDebug
(
"vgId:%d smaIndex %"
PRIi64
" tsKey %"
PRIi64
" is put to hash"
,
REPO_ID
(
pTsdb
),
indexUid
,
winSKey
);
}
/**
* @brief Update expired window according to msg from stream computing module.
*
* @param pTsdb
* @param msg SSubmitReq
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
const
char
*
msg
)
{
const
SSubmitReq
*
pMsg
=
(
const
SSubmitReq
*
)
msg
;
if
(
pMsg
->
length
<=
sizeof
(
SSubmitReq
))
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
TSDB_CODE_FAILED
;
}
if
(
!
pTsdb
->
pMeta
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
TSKEY
skey1
=
1646987196
*
1e3
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
if
(
taosHashPut
(
pItem
->
expiredWindows
,
expiredWindows
+
i
,
sizeof
(
TSKEY
),
&
state
,
sizeof
(
state
))
!=
0
)
{
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup
(
pItem
->
expiredWindows
);
tfree
(
pItem
->
pSma
);
taosHashRemove
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
expiredWindows
[
i
]
=
skey1
+
i
;
}
#else
#endif
// TODO: decode the msg <= end
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
#ifndef TSDB_SMA_TEST
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
#endif
// Firstly, assume that tSma can only be created on super table/normal table.
// getActiveTimeWindow
SSmaEnv
*
pEnv
=
REPO_SMA_ENV
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
);
TASSERT
(
pEnv
!=
NULL
&&
pStat
!=
NULL
&&
pItemsHash
!=
NULL
);
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SInterval
interval
=
{
0
};
if
(
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
// basic procedure
// TODO: optimization
tsdbRefSmaStat
(
pTsdb
,
pStat
);
while
(
true
)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
int64_t
suid
=
htobe64
(
pBlock
->
uid
);
STSmaWrapper
*
pSW
=
NULL
;
STSma
*
pTSma
=
NULL
;
while
(
true
)
{
SSubmitBlkIter
blkIter
=
{
0
};
if
(
tInitSubmitBlkIter
(
pBlock
,
&
blkIter
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
STSRow
*
row
=
tGetSubmitBlkNext
(
&
blkIter
);
if
(
row
==
NULL
)
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
if
(
pSW
==
NULL
)
{
if
((
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
suid
))
==
NULL
)
{
break
;
}
if
((
pSW
->
number
)
<=
0
||
(
pSW
->
tSma
==
NULL
))
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
pTSma
=
pSW
->
tSma
;
}
interval
.
interval
=
pTSma
->
interval
;
interval
.
intervalUnit
=
pTSma
->
intervalUnit
;
interval
.
offset
=
pTSma
->
offset
;
interval
.
precision
=
REPO_CFG
(
pTsdb
)
->
precision
;
interval
.
sliding
=
pTSma
->
sliding
;
interval
.
slidingUnit
=
pTSma
->
slidingUnit
;
STimeWindow
tw
=
getActiveTimeWindowX
(
TD_ROW_KEY
(
row
),
&
interval
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
TD_ROW_KEY
(
row
));
}
tsdbDebug
(
"vgId:%d smaIndex %"
PRIi64
" tsKey %"
PRIi64
" is put to hash"
,
REPO_ID
(
pTsdb
),
indexUid
,
expiredWindows
[
i
]);
}
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief When sma data received from stream computing, make the relative expired window valid.
*
* @param pTsdb
* @param pStat
* @param indexUid
* @param skey
* @return int32_t
*/
static
int32_t
tsdbResetExpiredWindow
(
STsdb
*
pTsdb
,
SSmaStat
*
pStat
,
int64_t
indexUid
,
TSKEY
skey
)
{
SSmaStatItem
*
pItem
=
NULL
;
...
...
@@ -582,7 +668,7 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
uint32_t
valueSize
=
0
;
void
*
data
=
tsdbGetSmaDataByKey
(
pDBFile
,
smaKey
,
keyLen
,
&
valueSize
);
void
*
data
=
tsdbGetSmaDataByKey
(
pDBFile
,
smaKey
,
keyLen
,
&
valueSize
);
ASSERT
(
data
!=
NULL
);
for
(
uint32_t
v
=
0
;
v
<
valueSize
;
v
+=
8
)
{
tsdbWarn
(
"vgId:%d insert sma data val[%d] %"
PRIi64
,
REPO_ID
(
pSmaH
->
pTsdb
),
v
,
*
(
int64_t
*
)
POINTER_SHIFT
(
data
,
v
));
...
...
@@ -699,7 +785,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
assert
(
pTbData
->
dataLen
>
0
);
STSmaColData
*
pColData
=
(
STSmaColData
*
)
POINTER_SHIFT
(
pTbData
->
data
,
tbLen
);
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
void
*
pSmaKey
=
&
smaKey
;
#if 0
printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
...
...
@@ -773,11 +859,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaDataWrapper
*
pData
=
(
STSmaDataWrapper
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
@@ -797,7 +882,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return
TSDB_CODE_FAILED
;
}
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
);
SSmaStatItem
*
pItem
=
NULL
;
tsdbRefSmaStat
(
pTsdb
,
pStat
);
...
...
@@ -812,8 +897,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return
TSDB_CODE_FAILED
;
}
char
rPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
aPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
rPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
aPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
rPath
,
TSDB_FILENAME_LEN
,
"%s%s%"
PRIi64
,
SMA_ENV_PATH
(
pEnv
),
TD_DIRSEP
,
indexUid
);
tfsAbsoluteName
(
REPO_TFS
(
pTsdb
),
SMA_ENV_DID
(
pEnv
),
rPath
,
aPath
);
if
(
!
taosCheckExistFile
(
aPath
))
{
...
...
@@ -859,9 +944,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
/**
* @brief Drop tSma data and local cache
* - insert/query reference
* @param pTsdb
* @param msg
* @return int32_t
* @param pTsdb
* @param msg
* @return int32_t
*/
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
)
{
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
...
...
@@ -902,8 +987,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
}
}
// clear sma data files
// TODO:
// TODO:
}
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
fid
)
{
...
...
@@ -917,9 +1001,9 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
}
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaDataWrapper
*
pData
=
(
STSmaDataWrapper
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pRSmaEnv
);
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pRSmaEnv
);
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
@@ -1137,7 +1221,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
tReadH
.
dFile
.
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
SMA_KEY_LEN
);
void
*
result
=
NULL
;
void
*
result
=
NULL
;
uint32_t
valueSize
=
0
;
if
((
result
=
tsdbGetSmaDataByKey
(
&
tReadH
.
dFile
,
smaKey
,
SMA_KEY_LEN
,
&
valueSize
))
==
NULL
)
{
tsdbWarn
(
"vgId:%d get sma data failed from smaIndex %"
PRIi64
", smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
...
...
@@ -1219,15 +1303,8 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
}
#endif
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
// TODO: Who is responsible for resource allocate and release?
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
msg
))
<
0
)
{
...
...
@@ -1236,21 +1313,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
return
code
;
}
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
)
{
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
const
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbUpdateExpiredWindow
(
pTsdb
,
smaType
,
msg
))
<
0
)
{
if
((
code
=
tsdbUpdateExpiredWindow
Impl
(
pTsdb
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d update expired sma window failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertRSmaDataImpl
(
pTsdb
,
msg
))
<
0
)
{
...
...
@@ -1259,20 +1329,7 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
return
code
;
}
/**
* @brief Get tSma data
*
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -1283,13 +1340,7 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid,
return
code
;
}
/**
* @brief Drop tSma Data and caches
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbDropTSmaDataImpl
(
pTsdb
,
indexUid
))
<
0
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
585ae95d
...
...
@@ -32,48 +32,4 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
}
}
return
tsdbMemTableInsert
(
pTsdb
,
pTsdb
->
mem
,
pMsg
,
NULL
);
}
#if 0
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
#endif
\ No newline at end of file
}
\ No newline at end of file
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
585ae95d
...
...
@@ -98,7 +98,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
tSma
.
slidingUnit
=
TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tstrncpy
(
tSma
.
indexName
,
"sma_index_test"
,
TSDB_INDEX_NAME_LEN
);
t
strncpy
(
tSma
.
timezone
,
"Asia/Shanghai"
,
TD_TIMEZONE_LEN
)
;
t
Sma
.
timezoneInt
=
8
;
tSma
.
indexUid
=
2345678910
;
tSma
.
tableUid
=
1234567890
;
...
...
@@ -128,7 +128,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
ASSERT_EQ
(
pSma
->
intervalUnit
,
qSma
->
intervalUnit
);
ASSERT_EQ
(
pSma
->
slidingUnit
,
qSma
->
slidingUnit
);
ASSERT_STRCASEEQ
(
pSma
->
indexName
,
qSma
->
indexName
);
ASSERT_
STRCASEEQ
(
pSma
->
timezone
,
qSma
->
timezone
);
ASSERT_
EQ
(
pSma
->
timezoneInt
,
qSma
->
timezoneInt
);
ASSERT_EQ
(
pSma
->
indexUid
,
qSma
->
indexUid
);
ASSERT_EQ
(
pSma
->
tableUid
,
qSma
->
tableUid
);
ASSERT_EQ
(
pSma
->
interval
,
qSma
->
interval
);
...
...
@@ -150,7 +150,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
TEST
(
testCase
,
tSma_metaDB_Put_Get_Del_Test
)
{
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
smaIndexName2
=
"sma_index_test_2"
;
const
char
*
timezone
=
"Asia/Shanghai"
;
int8_t
timezone
=
8
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"I'm tags filter"
;
const
char
*
smaTestDir
=
"./smaTest"
;
...
...
@@ -167,7 +167,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tSma
.
sliding
=
0
;
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
t
strncpy
(
tSma
.
timezone
,
timezone
,
TD_TIMEZONE_LEN
)
;
t
Sma
.
timezoneInt
=
8
;
tSma
.
tableUid
=
tbUid
;
tSma
.
exprLen
=
strlen
(
expr
);
...
...
@@ -207,7 +207,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid1
);
assert
(
qSmaCfg
!=
NULL
);
printf
(
"name1 = %s
\n
"
,
qSmaCfg
->
indexName
);
printf
(
"timezone1 = %
s
\n
"
,
qSmaCfg
->
timezone
);
printf
(
"timezone1 = %
"
PRIi8
"
\n
"
,
qSmaCfg
->
timezoneInt
);
printf
(
"expr1 = %s
\n
"
,
qSmaCfg
->
expr
!=
NULL
?
qSmaCfg
->
expr
:
""
);
printf
(
"tagsFilter1 = %s
\n
"
,
qSmaCfg
->
tagsFilter
!=
NULL
?
qSmaCfg
->
tagsFilter
:
""
);
ASSERT_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName1
);
...
...
@@ -218,7 +218,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid2
);
assert
(
qSmaCfg
!=
NULL
);
printf
(
"name2 = %s
\n
"
,
qSmaCfg
->
indexName
);
printf
(
"timezone2 = %
s
\n
"
,
qSmaCfg
->
timezone
);
printf
(
"timezone2 = %
"
PRIi8
"
\n
"
,
qSmaCfg
->
timezoneInt
);
printf
(
"expr2 = %s
\n
"
,
qSmaCfg
->
expr
!=
NULL
?
qSmaCfg
->
expr
:
""
);
printf
(
"tagsFilter2 = %s
\n
"
,
qSmaCfg
->
tagsFilter
!=
NULL
?
qSmaCfg
->
tagsFilter
:
""
);
ASSERT_STRCASEEQ
(
qSmaCfg
->
indexName
,
smaIndexName2
);
...
...
@@ -246,13 +246,13 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
assert
(
pSW
!=
NULL
);
ASSERT_EQ
(
pSW
->
number
,
nCntTSma
);
ASSERT_STRCASEEQ
(
pSW
->
tSma
->
indexName
,
smaIndexName1
);
ASSERT_
STRCASEEQ
(
pSW
->
tSma
->
timezone
,
timezone
);
ASSERT_
EQ
(
pSW
->
tSma
->
timezoneInt
,
timezone
);
ASSERT_STRCASEEQ
(
pSW
->
tSma
->
expr
,
expr
);
ASSERT_STRCASEEQ
(
pSW
->
tSma
->
tagsFilter
,
tagsFilter
);
ASSERT_EQ
(
pSW
->
tSma
->
indexUid
,
indexUid1
);
ASSERT_EQ
(
pSW
->
tSma
->
tableUid
,
tbUid
);
ASSERT_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
indexName
,
smaIndexName2
);
ASSERT_
STRCASEEQ
((
pSW
->
tSma
+
1
)
->
timezone
,
timezone
);
ASSERT_
EQ
((
pSW
->
tSma
+
1
)
->
timezoneInt
,
timezone
);
ASSERT_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
expr
,
expr
);
ASSERT_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
tagsFilter
,
tagsFilter
);
ASSERT_EQ
((
pSW
->
tSma
+
1
)
->
indexUid
,
indexUid2
);
...
...
@@ -284,7 +284,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
TEST
(
testCase
,
tSma_Data_Insert_Query_Test
)
{
// step 1: prepare meta
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
timezone
=
"Asia/Shanghai"
;
const
int8_t
timezone
=
8
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"where tags.location='Beijing' and tags.district='ChaoYang'"
;
const
char
*
smaTestDir
=
"./smaTest"
;
...
...
@@ -305,7 +305,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
tSma
.
sliding
=
0
;
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
t
strncpy
(
tSma
.
timezone
,
timezone
,
TD_TIMEZONE_LEN
)
;
t
Sma
.
timezoneInt
=
timezone
;
tSma
.
tableUid
=
tbUid
;
tSma
.
exprLen
=
strlen
(
expr
);
...
...
@@ -369,7 +369,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
char
*
msg
=
(
char
*
)
calloc
(
1
,
100
);
ASSERT_NE
(
msg
,
nullptr
);
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
,
msg
),
0
);
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
msg
),
0
);
// init
int32_t
allocCnt
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录