Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
05d6c309
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
05d6c309
编写于
4月 13, 2022
作者:
C
Cary Xu
提交者:
GitHub
4月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11453 from taosdata/feature/TD-11463-3.0
feat: store time-range-wise sma data by tdb
上级
11e772d4
7cf63f09
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
831 addition
and
79 deletion
+831
-79
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+2
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-3
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+68
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-0
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+11
-8
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+338
-18
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+193
-25
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
+137
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+4
-4
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+6
-5
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+13
-12
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+3
-0
tests/script/tsim/sma/tsmaCreateInsertData.sim
tests/script/tsim/sma/tsmaCreateInsertData.sim
+48
-0
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
05d6c309
...
...
@@ -30,6 +30,7 @@ target_sources(
# tsdb
# "src/tsdb/tsdbBDBImpl.c"
"src/tsdb/tsdbTDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbFile.c"
...
...
@@ -40,7 +41,7 @@ target_sources(
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbScan.c"
#
"src/tsdb/tsdbSma.c"
"src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
# tq
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
05d6c309
...
...
@@ -357,7 +357,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
...
...
@@ -369,8 +369,8 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCur
os
r
(
SMSmaCursor
*
pSmaCur
);
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
void
metaCloseSmaCur
so
r
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
// Options
void
metaOptionsInit
(
SMetaCfg
*
pMetaCfg
);
...
...
source/dnode/vnode/src/inc/tsdbSma.h
0 → 100644
浏览文件 @
05d6c309
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TSDB_SMA_H_
#define _TD_VNODE_TSDB_SMA_H_
#include "tdbInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SSmaKey
SSmaKey
;
struct
SSmaKey
{
TSKEY
skey
;
int64_t
groupId
;
};
typedef
struct
SDBFile
SDBFile
;
struct
SDBFile
{
int32_t
fid
;
TDB
*
pDB
;
char
*
path
;
};
int32_t
tsdbOpenDBEnv
(
TENV
**
ppEnv
,
const
char
*
path
);
int32_t
tsdbCloseDBEnv
(
TENV
*
pEnv
);
int32_t
tsdbOpenDBF
(
TENV
*
pEnv
,
SDBFile
*
pDBF
);
int32_t
tsdbCloseDBF
(
SDBFile
*
pDBF
);
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
);
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
const
void
*
pKey
,
int32_t
keyLen
,
int32_t
*
valLen
);
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tsdbFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
int64_t
groupId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
len
+=
taosEncodeFixedI64
(
pData
,
groupId
);
return
len
;
}
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_TSDB_SMA_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
05d6c309
...
...
@@ -220,6 +220,8 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
#include "tq.h"
#include "tsdbSma.h"
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
05d6c309
...
...
@@ -667,7 +667,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
return
pTbCfg
;
}
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
)
{
STSma
*
pCfg
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
DBT
key
=
{
0
};
...
...
@@ -920,7 +920,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
return
pCur
;
}
void
metaCloseSmaCur
os
r
(
SMSmaCursor
*
pCur
)
{
void
metaCloseSmaCur
so
r
(
SMSmaCursor
*
pCur
)
{
if
(
pCur
)
{
if
(
pCur
->
pCur
)
{
pCur
->
pCur
->
close
(
pCur
->
pCur
);
...
...
@@ -930,7 +930,8 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) {
}
}
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
#if 0
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
...
...
@@ -946,6 +947,8 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) {
} else {
return NULL;
}
#endif
return
0
;
}
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
...
...
@@ -972,7 +975,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
++
pSW
->
number
;
STSma
*
tptr
=
(
STSma
*
)
taosMemoryRealloc
(
pSW
->
tSma
,
pSW
->
number
*
sizeof
(
STSma
));
if
(
tptr
==
NULL
)
{
metaCloseSmaCur
os
r
(
pCur
);
metaCloseSmaCur
so
r
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
);
taosMemoryFreeClear
(
pSW
);
return
NULL
;
...
...
@@ -980,7 +983,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
pSW
->
tSma
=
tptr
;
pBuf
=
pval
.
data
;
if
(
tDecodeTSma
(
pBuf
,
pSW
->
tSma
+
pSW
->
number
-
1
)
==
NULL
)
{
metaCloseSmaCur
os
r
(
pCur
);
metaCloseSmaCur
so
r
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
);
taosMemoryFreeClear
(
pSW
);
return
NULL
;
...
...
@@ -990,8 +993,8 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
break
;
}
metaCloseSmaCur
os
r
(
pCur
);
metaCloseSmaCur
so
r
(
pCur
);
return
pSW
;
}
...
...
@@ -1004,7 +1007,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
int
ret
;
// TODO: lock?
ret
=
pDB
->
p
Ctb
Idx
->
cursor
(
pDB
->
pSmaIdx
,
NULL
,
&
pCur
,
0
);
ret
=
pDB
->
p
Sma
Idx
->
cursor
(
pDB
->
pSmaIdx
,
NULL
,
&
pCur
,
0
);
if
(
ret
!=
0
)
{
return
NULL
;
}
...
...
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
05d6c309
...
...
@@ -22,6 +22,8 @@ typedef struct SPoolMem {
struct
SPoolMem
*
next
;
}
SPoolMem
;
#define META_TDB_SMA_TEST
static
SPoolMem
*
openPool
();
static
void
clearPool
(
SPoolMem
*
pPool
);
static
void
closePool
(
SPoolMem
*
pPool
);
...
...
@@ -38,6 +40,10 @@ struct SMetaDB {
TDB
*
pNtbIdx
;
TDB
*
pCtbIdx
;
SPoolMem
*
pPool
;
#ifdef META_TDB_SMA_TEST
TDB
*
pSmaDB
;
TDB
*
pSmaIdx
;
#endif
};
typedef
struct
__attribute__
((
__packed__
))
{
...
...
@@ -55,6 +61,11 @@ typedef struct {
tb_uid_t
uid
;
}
SCtbIdxKey
;
typedef
struct
{
tb_uid_t
uid
;
int64_t
smaUid
;
}
SSmaIdxKey
;
static
int
metaEncodeTbInfo
(
void
**
buf
,
STbCfg
*
pTbCfg
);
static
void
*
metaDecodeTbInfo
(
void
*
buf
,
STbCfg
*
pTbCfg
);
static
int
metaEncodeSchema
(
void
**
buf
,
SSchemaWrapper
*
pSW
);
...
...
@@ -115,6 +126,17 @@ static inline int metaCtbIdxCmpr(const void *arg1, int len1, const void *arg2, i
return
metaUidCmpr
(
&
pKey1
->
uid
,
sizeof
(
tb_uid_t
),
&
pKey2
->
uid
,
sizeof
(
tb_uid_t
));
}
static
inline
int
metaSmaIdxCmpr
(
const
void
*
arg1
,
int
len1
,
const
void
*
arg2
,
int
len2
)
{
int
c
;
SSmaIdxKey
*
pKey1
=
(
SSmaIdxKey
*
)
arg1
;
SSmaIdxKey
*
pKey2
=
(
SSmaIdxKey
*
)
arg2
;
c
=
metaUidCmpr
(
arg1
,
sizeof
(
tb_uid_t
),
arg2
,
sizeof
(
tb_uid_t
));
if
(
c
)
return
c
;
return
metaUidCmpr
(
&
pKey1
->
smaUid
,
sizeof
(
int64_t
),
&
pKey2
->
smaUid
,
sizeof
(
int64_t
));
}
int
metaOpenDB
(
SMeta
*
pMeta
)
{
SMetaDB
*
pMetaDb
;
int
ret
;
...
...
@@ -143,6 +165,15 @@ int metaOpenDB(SMeta *pMeta) {
return
-
1
;
}
#ifdef META_TDB_SMA_TEST
ret
=
tdbDbOpen
(
"sma.db"
,
sizeof
(
int64_t
),
TDB_VARIANT_LEN
,
metaUidCmpr
,
pMetaDb
->
pEnv
,
&
(
pMetaDb
->
pSmaDB
));
if
(
ret
<
0
)
{
// TODO
ASSERT
(
0
);
return
-
1
;
}
#endif
// open schema DB
ret
=
tdbDbOpen
(
"schema.db"
,
sizeof
(
SSchemaDbKey
),
TDB_VARIANT_LEN
,
metaSchemaKeyCmpr
,
pMetaDb
->
pEnv
,
&
(
pMetaDb
->
pSchemaDB
));
...
...
@@ -180,6 +211,15 @@ int metaOpenDB(SMeta *pMeta) {
return
-
1
;
}
#ifdef META_TDB_SMA_TEST
ret
=
tdbDbOpen
(
"sma.idx"
,
sizeof
(
SSmaIdxKey
),
0
,
metaSmaIdxCmpr
,
pMetaDb
->
pEnv
,
&
(
pMetaDb
->
pSmaIdx
));
if
(
ret
<
0
)
{
// TODO
ASSERT
(
0
);
return
-
1
;
}
#endif
pMetaDb
->
pPool
=
openPool
();
tdbTxnOpen
(
&
pMetaDb
->
txn
,
0
,
poolMalloc
,
poolFree
,
pMetaDb
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pMetaDb
->
pEnv
,
NULL
);
...
...
@@ -193,10 +233,16 @@ void metaCloseDB(SMeta *pMeta) {
tdbCommit
(
pMeta
->
pDB
->
pEnv
,
&
pMeta
->
pDB
->
txn
);
tdbTxnClose
(
&
pMeta
->
pDB
->
txn
);
clearPool
(
pMeta
->
pDB
->
pPool
);
#ifdef META_TDB_SMA_TEST
tdbDbClose
(
pMeta
->
pDB
->
pSmaIdx
);
#endif
tdbDbClose
(
pMeta
->
pDB
->
pCtbIdx
);
tdbDbClose
(
pMeta
->
pDB
->
pNtbIdx
);
tdbDbClose
(
pMeta
->
pDB
->
pStbIdx
);
tdbDbClose
(
pMeta
->
pDB
->
pNameIdx
);
#ifdef META_TDB_SMA_TEST
tdbDbClose
(
pMeta
->
pDB
->
pSmaDB
);
#endif
tdbDbClose
(
pMeta
->
pDB
->
pSchemaDB
);
tdbDbClose
(
pMeta
->
pDB
->
pTbDB
);
taosMemoryFree
(
pMeta
->
pDB
);
...
...
@@ -491,7 +537,6 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
taosMemoryFree
(
tbCfg
.
name
);
taosMemoryFree
(
tbCfg
.
stbCfg
.
pTagSchema
);
continue
;
;
}
else
if
(
tbCfg
.
type
==
META_CHILD_TABLE
)
{
kvRowFree
(
tbCfg
.
ctbCfg
.
pTag
);
}
...
...
@@ -566,51 +611,326 @@ int metaGetTbNum(SMeta *pMeta) {
return
0
;
}
struct
SMSmaCursor
{
TDBC
*
pCur
;
tb_uid_t
uid
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
};
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
STSmaWrapper
*
pSW
=
NULL
;
pSW
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pSW
));
if
(
pSW
==
NULL
)
{
return
NULL
;
}
SMSmaCursor
*
pCur
=
metaOpenSmaCursor
(
pMeta
,
uid
);
if
(
pCur
==
NULL
)
{
taosMemoryFree
(
pSW
);
return
NULL
;
}
void
*
pBuf
=
NULL
;
SSmaIdxKey
*
pSmaIdxKey
=
NULL
;
while
(
true
)
{
// TODO: lock during iterate?
if
(
tdbDbNext
(
pCur
->
pCur
,
&
pCur
->
pKey
,
&
pCur
->
kLen
,
NULL
,
&
pCur
->
vLen
)
==
0
)
{
pSmaIdxKey
=
pCur
->
pKey
;
ASSERT
(
pSmaIdxKey
!=
NULL
);
void
*
pSmaVal
=
metaGetSmaInfoByIndex
(
pMeta
,
pSmaIdxKey
->
smaUid
,
false
);
if
(
pSmaVal
==
NULL
)
{
tsdbWarn
(
"no tsma exists for indexUid: %"
PRIi64
,
pSmaIdxKey
->
smaUid
);
continue
;
}
++
pSW
->
number
;
STSma
*
tptr
=
(
STSma
*
)
taosMemoryRealloc
(
pSW
->
tSma
,
pSW
->
number
*
sizeof
(
STSma
));
if
(
tptr
==
NULL
)
{
TDB_FREE
(
pSmaVal
);
metaCloseSmaCursor
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
);
taosMemoryFreeClear
(
pSW
);
return
NULL
;
}
pSW
->
tSma
=
tptr
;
pBuf
=
pSmaVal
;
if
(
tDecodeTSma
(
pBuf
,
pSW
->
tSma
+
pSW
->
number
-
1
)
==
NULL
)
{
TDB_FREE
(
pSmaVal
);
metaCloseSmaCursor
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
);
taosMemoryFreeClear
(
pSW
);
return
NULL
;
}
TDB_FREE
(
pSmaVal
);
continue
;
}
break
;
}
metaCloseSmaCursor
(
pCur
);
return
pSW
;
#endif
}
int
metaRemoveSmaFromDb
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
// TODO
ASSERT
(
0
);
#ifndef META_TDB_SMA_TEST
DBT
key
=
{
0
};
key
.
data
=
(
void
*
)
indexName
;
key
.
size
=
strlen
(
indexName
);
metaDBWLock
(
pMeta
->
pDB
);
// TODO: No guarantee of consistence.
// Use transaction or DB->sync() for some guarantee.
pMeta
->
pDB
->
pSmaDB
->
del
(
pMeta
->
pDB
->
pSmaDB
,
NULL
,
&
key
,
0
);
metaDBULock
(
pMeta
->
pDB
);
#endif
return
0
;
}
int
metaSaveSmaToDB
(
SMeta
*
pMeta
,
STSma
*
pSmaCfg
)
{
// TODO
ASSERT
(
0
);
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
int32_t
ret
=
0
;
SMetaDB
*
pMetaDb
=
pMeta
->
pDB
;
void
*
pBuf
=
NULL
,
*
qBuf
=
NULL
;
void
*
key
=
{
0
},
*
val
=
{
0
};
// save sma info
int32_t
len
=
tEncodeTSma
(
NULL
,
pSmaCfg
);
pBuf
=
taosMemoryCalloc
(
1
,
len
);
if
(
pBuf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
key
=
(
void
*
)
&
pSmaCfg
->
indexUid
;
qBuf
=
pBuf
;
tEncodeTSma
(
&
qBuf
,
pSmaCfg
);
val
=
pBuf
;
int32_t
kLen
=
sizeof
(
pSmaCfg
->
indexUid
);
int32_t
vLen
=
POINTER_DISTANCE
(
qBuf
,
pBuf
);
ret
=
tdbDbInsert
(
pMeta
->
pDB
->
pSmaDB
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
taosMemoryFreeClear
(
pBuf
);
return
-
1
;
}
// add sma idx
SSmaIdxKey
smaIdxKey
;
smaIdxKey
.
uid
=
pSmaCfg
->
tableUid
;
smaIdxKey
.
smaUid
=
pSmaCfg
->
indexUid
;
key
=
&
smaIdxKey
;
kLen
=
sizeof
(
smaIdxKey
);
val
=
NULL
;
vLen
=
0
;
ret
=
tdbDbInsert
(
pMeta
->
pDB
->
pSmaIdx
,
key
,
kLen
,
val
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
taosMemoryFreeClear
(
pBuf
);
return
-
1
;
}
// release
taosMemoryFreeClear
(
pBuf
);
if
(
pMeta
->
pDB
->
pPool
->
size
>
0
)
{
metaCommit
(
pMeta
);
}
#endif
return
0
;
}
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
SMetaDB
*
pDB
=
pMeta
->
pDB
;
void
*
pKey
=
NULL
;
void
*
pVal
=
NULL
;
int
kLen
=
0
;
int
vLen
=
0
;
int
ret
=
-
1
;
// Set key
pKey
=
(
void
*
)
&
indexUid
;
kLen
=
sizeof
(
indexUid
);
// Query
ret
=
tdbDbGet
(
pDB
->
pSmaDB
,
pKey
,
kLen
,
&
pVal
,
&
vLen
);
if
(
ret
!=
0
||
!
pVal
)
{
return
NULL
;
}
if
(
!
isDecode
)
{
// return raw value
return
pVal
;
}
// Decode
STSma
*
pCfg
=
(
STSma
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STSma
));
if
(
pCfg
==
NULL
)
{
taosMemoryFree
(
pVal
);
return
NULL
;
}
void
*
pBuf
=
pVal
;
if
(
tDecodeTSma
(
pBuf
,
pCfg
)
==
NULL
)
{
tdDestroyTSma
(
pCfg
);
taosMemoryFree
(
pCfg
);
TDB_FREE
(
pVal
);
return
NULL
;
}
TDB_FREE
(
pVal
);
return
pCfg
;
#endif
}
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
/**
* @brief
*
* @param pMeta
* @param uid 0 means iterate all uids.
* @return SMSmaCursor*
*/
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
SMSmaCursor
*
pCur
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
int
ret
;
pCur
=
(
SMSmaCursor
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pCur
));
if
(
pCur
==
NULL
)
{
return
NULL
;
}
pCur
->
uid
=
uid
;
ret
=
tdbDbcOpen
(
pDB
->
pSmaIdx
,
&
(
pCur
->
pCur
));
if
((
ret
!=
0
)
||
(
pCur
->
pCur
==
NULL
))
{
taosMemoryFree
(
pCur
);
return
NULL
;
}
if
(
uid
!=
0
)
{
// TODO: move to the specific uid
}
return
pCur
;
#endif
}
void
metaCloseSmaCurosr
(
SMSmaCursor
*
pCur
)
{
/**
* @brief
*
* @param pCur
* @return int64_t smaIndexUid
*/
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
// TODO
ASSERT
(
0
);
// ASSERT(0);
// return NULL;
#ifdef META_TDB_SMA_TEST
int
ret
;
void
*
pBuf
;
SSmaIdxKey
*
smaIdxKey
;
ret
=
tdbDbNext
(
pCur
->
pCur
,
&
pCur
->
pKey
,
&
pCur
->
kLen
,
&
pCur
->
pVal
,
&
pCur
->
vLen
);
if
(
ret
<
0
)
{
return
0
;
}
smaIdxKey
=
pCur
->
pKey
;
return
smaIdxKey
->
smaUid
;
#endif
}
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
)
{
void
metaCloseSmaCursor
(
SMSmaCursor
*
pCur
)
{
// TODO
// ASSERT(0); // comment this line to pass CI
return
NULL
;
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
if
(
pCur
)
{
if
(
pCur
->
pCur
)
{
tdbDbcClose
(
pCur
->
pCur
);
}
taosMemoryFree
(
pCur
);
}
#endif
}
S
MSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
S
Array
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
// ASSERT(0); // comment this line to pass CI
// return NULL:
#ifdef META_TDB_SMA_TEST
SArray
*
pUids
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
void
*
pKey
;
// TODO: lock?
SMSmaCursor
*
pCur
=
metaOpenSmaCursor
(
pMeta
,
0
);
if
(
pCur
==
NULL
)
{
return
NULL
;
}
// TODO: lock?
SSmaIdxKey
*
pSmaIdxKey
=
NULL
;
tb_uid_t
uid
=
0
;
while
(
true
)
{
// TODO: lock during iterate?
if
(
tdbDbNext
(
pCur
->
pCur
,
&
pCur
->
pKey
,
&
pCur
->
kLen
,
NULL
,
&
pCur
->
vLen
)
==
0
)
{
ASSERT
(
pSmaIdxKey
!=
NULL
);
pSmaIdxKey
=
pCur
->
pKey
;
if
(
pSmaIdxKey
->
uid
==
0
||
pSmaIdxKey
->
uid
==
uid
)
{
continue
;
}
uid
=
pSmaIdxKey
->
uid
;
if
(
!
pUids
)
{
pUids
=
taosArrayInit
(
16
,
sizeof
(
tb_uid_t
));
if
(
!
pUids
)
{
metaCloseSmaCursor
(
pCur
);
return
NULL
;
}
}
taosArrayPush
(
pUids
,
&
uid
);
continue
;
}
break
;
}
metaCloseSmaCursor
(
pCur
);
return
pUids
;
#endif
}
static
int
metaEncodeSchema
(
void
**
buf
,
SSchemaWrapper
*
pSW
)
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
05d6c309
...
...
@@ -82,9 +82,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy
(
data
,
msg
,
msgLen
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
//
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
//
return -1;
//
}
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
)
!=
0
)
{
return
-
1
;
}
}
SRpcMsg
req
=
{
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
05d6c309
...
...
@@ -38,6 +38,29 @@ typedef enum {
SMA_STORAGE_LEVEL_DFILESET
=
1
// use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
}
ESmaStorageLevel
;
typedef
struct
SPoolMem
{
int64_t
size
;
struct
SPoolMem
*
prev
;
struct
SPoolMem
*
next
;
}
SPoolMem
;
struct
SSmaEnv
{
TdThreadRwlock
lock
;
TXN
txn
;
SPoolMem
*
pPool
;
SDiskID
did
;
TENV
*
dbEnv
;
// TODO: If it's better to put it in smaIndex level?
char
*
path
;
// relative path
SSmaStat
*
pStat
;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
typedef
struct
{
STsdb
*
pTsdb
;
SDBFile
dFile
;
...
...
@@ -104,7 +127,8 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbGetSmaStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int32_t
fid
);
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
uint32_t
keyLen
,
void
*
pData
,
uint32_t
dataLen
);
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
int32_t
keyLen
,
void
*
pData
,
int32_t
dataLen
,
TXN
*
txn
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
,
bool
adjusted
);
static
int32_t
tsdbGetTSmaDays
(
STsdb
*
pTsdb
,
int64_t
interval
,
int32_t
storageLevel
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int64_t
indexUid
,
int32_t
fid
);
...
...
@@ -117,9 +141,121 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
// mgmt interface
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
// Pool Memory
static
SPoolMem
*
openPool
();
static
void
clearPool
(
SPoolMem
*
pPool
);
static
void
closePool
(
SPoolMem
*
pPool
);
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
);
static
void
poolFree
(
void
*
arg
,
void
*
ptr
);
static
int
tsdbSmaBeginCommit
(
SSmaEnv
*
pEnv
);
static
int
tsdbSmaEndCommit
(
SSmaEnv
*
pEnv
);
// implementation
static
FORCE_INLINE
int16_t
tsdbTSmaAdd
(
STsdb
*
pTsdb
,
int16_t
n
)
{
return
atomic_add_fetch_16
(
&
REPO_TSMA_NUM
(
pTsdb
),
n
);
}
static
FORCE_INLINE
int16_t
tsdbTSmaSub
(
STsdb
*
pTsdb
,
int16_t
n
)
{
return
atomic_sub_fetch_16
(
&
REPO_TSMA_NUM
(
pTsdb
),
n
);
}
static
FORCE_INLINE
int16_t
tsdbTSmaAdd
(
STsdb
*
pTsdb
,
int16_t
n
)
{
return
atomic_add_fetch_16
(
&
REPO_TSMA_NUM
(
pTsdb
),
n
);
}
static
FORCE_INLINE
int16_t
tsdbTSmaSub
(
STsdb
*
pTsdb
,
int16_t
n
)
{
return
atomic_sub_fetch_16
(
&
REPO_TSMA_NUM
(
pTsdb
),
n
);
}
static
FORCE_INLINE
int32_t
tsdbRLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockRdlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int32_t
tsdbWLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockWrlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int32_t
tsdbUnLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockUnlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
SPoolMem
*
openPool
()
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pPool
));
pPool
->
prev
=
pPool
->
next
=
pPool
;
pPool
->
size
=
0
;
return
pPool
;
}
static
void
clearPool
(
SPoolMem
*
pPool
)
{
if
(
!
pPool
)
return
;
SPoolMem
*
pMem
;
do
{
pMem
=
pPool
->
next
;
if
(
pMem
==
pPool
)
break
;
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
tdbOsFree
(
pMem
);
}
while
(
1
);
assert
(
pPool
->
size
==
0
);
}
static
void
closePool
(
SPoolMem
*
pPool
)
{
if
(
pPool
)
{
clearPool
(
pPool
);
tdbOsFree
(
pPool
);
}
}
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
)
{
void
*
ptr
=
NULL
;
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pMem
)
+
size
);
if
(
pMem
==
NULL
)
{
assert
(
0
);
}
pMem
->
size
=
sizeof
(
*
pMem
)
+
size
;
pMem
->
next
=
pPool
->
next
;
pMem
->
prev
=
pPool
;
pPool
->
next
->
prev
=
pMem
;
pPool
->
next
=
pMem
;
pPool
->
size
+=
pMem
->
size
;
ptr
=
(
void
*
)(
&
pMem
[
1
]);
return
ptr
;
}
static
void
poolFree
(
void
*
arg
,
void
*
ptr
)
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
&
(((
SPoolMem
*
)
ptr
)[
-
1
]);
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
tdbOsFree
(
pMem
);
}
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
)
{
// tSma
...
...
@@ -213,7 +349,12 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
char
aname
[
TSDB_FILENAME_LEN
]
=
{
0
};
tfsAbsoluteName
(
pTsdb
->
pTfs
,
did
,
path
,
aname
);
if
(
tsdbOpenBDBEnv
(
&
pEnv
->
dbEnv
,
aname
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbOpenDBEnv
(
&
pEnv
->
dbEnv
,
aname
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
}
if
((
pEnv
->
pPool
=
openPool
())
==
NULL
)
{
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
}
...
...
@@ -248,7 +389,8 @@ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
taosMemoryFreeClear
(
pSmaEnv
->
pStat
);
taosMemoryFreeClear
(
pSmaEnv
->
path
);
taosThreadRwlockDestroy
(
&
(
pSmaEnv
->
lock
));
tsdbCloseBDBEnv
(
pSmaEnv
->
dbEnv
);
tsdbCloseDBEnv
(
pSmaEnv
->
dbEnv
);
closePool
(
pSmaEnv
->
pPool
);
}
}
...
...
@@ -414,7 +556,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
}
// cache smaMeta
STSma
*
pSma
=
metaGetSmaInfoByIndex
(
pTsdb
->
pMeta
,
indexUid
);
STSma
*
pSma
=
metaGetSmaInfoByIndex
(
pTsdb
->
pMeta
,
indexUid
,
true
);
if
(
pSma
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
;
taosHashCleanup
(
pItem
->
expiredWindows
);
...
...
@@ -498,10 +640,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
return
TSDB_CODE_FAILED
;
}
#ifndef TSDB_SMA_TEST
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
#endif
// Firstly, assume that tSma can only be created on super table/normal table.
// getActiveTimeWindow
...
...
@@ -563,6 +701,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY
winSKey
=
taosTimeTruncate
(
TD_ROW_KEY
(
row
),
&
interval
,
interval
.
precision
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
);
// TODO: release only when suid changes.
tdDestroyTSmaWrapper
(
pSW
);
taosMemoryFreeClear
(
pSW
);
}
}
...
...
@@ -676,10 +818,12 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
* @param dataLen
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
uint32_t
keyLen
,
void
*
pData
,
uint32_t
dataLen
)
{
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
int32_t
keyLen
,
void
*
pData
,
int32_t
dataLen
,
TXN
*
txn
)
{
SDBFile
*
pDBFile
=
&
pSmaH
->
dFile
;
// TODO: insert sma data blocks into B+Tree(TDB)
if
(
tsdbSaveSmaToDB
(
pDBFile
,
smaKey
,
keyLen
,
pData
,
dataLen
)
!=
0
)
{
if
(
tsdbSaveSmaToDB
(
pDBFile
,
smaKey
,
keyLen
,
pData
,
dataLen
,
txn
)
!=
0
)
{
tsdbWarn
(
"vgId:%d insert sma data blocks into %s: smaKey %"
PRIx64
"-%"
PRIx64
", dataLen %"
PRIu32
" fail"
,
REPO_ID
(
pSmaH
->
pTsdb
),
pDBFile
->
path
,
*
(
int64_t
*
)
smaKey
,
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
dataLen
);
return
TSDB_CODE_FAILED
;
...
...
@@ -826,6 +970,30 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
return
daysPerFile
;
}
static
int
tsdbSmaBeginCommit
(
SSmaEnv
*
pEnv
)
{
TXN
*
pTxn
=
&
pEnv
->
txn
;
// start a new txn
tdbTxnOpen
(
pTxn
,
0
,
poolMalloc
,
poolFree
,
pEnv
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
if
(
tdbBegin
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb restart txn fail"
);
return
-
1
;
}
return
0
;
}
static
int
tsdbSmaEndCommit
(
SSmaEnv
*
pEnv
)
{
TXN
*
pTxn
=
&
pEnv
->
txn
;
// Commit current txn
if
(
tdbCommit
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb commit fail"
);
return
-
1
;
}
tdbTxnClose
(
pTxn
);
clearPool
(
pEnv
->
pPool
);
return
0
;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
...
...
@@ -911,14 +1079,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
int64_t
groupId
=
pDataBlock
->
info
.
groupId
;
for
(
int32_t
j
=
0
;
j
<
rows
;
++
j
)
{
printf
(
"|"
);
TSKEY
skey
=
1649295200000
;
// TSKEY_INITIAL_VAL; //
the start key of TS window by interval
TSKEY
skey
=
TSKEY_INITIAL_VAL
;
//
the start key of TS window by interval
void
*
pSmaKey
=
&
smaKey
;
bool
isStartKey
=
false
;
{
// just for debugging
isStartKey
=
true
;
tsdbEncodeTSmaKey
(
groupId
,
skey
,
&
pSmaKey
);
}
int32_t
tlen
=
0
;
// reset the len
pDataBuf
=
&
dataBuf
;
// reset the buf
for
(
int32_t
k
=
0
;
k
<
colNum
;
++
k
)
{
...
...
@@ -929,7 +1093,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
if
(
!
isStartKey
)
{
isStartKey
=
true
;
skey
=
*
(
TSKEY
*
)
var
;
printf
(
"=
=> skey =
%"
PRIi64
" groupId = %"
PRIi64
"|"
,
skey
,
groupId
);
printf
(
"=
skey
%"
PRIi64
" groupId = %"
PRIi64
"|"
,
skey
,
groupId
);
tsdbEncodeTSmaKey
(
groupId
,
skey
,
&
pSmaKey
);
}
else
{
printf
(
" %"
PRIi64
" |"
,
*
(
int64_t
*
)
var
);
...
...
@@ -1010,6 +1174,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
// TODO: tsdbStartTSmaCommit();
if
(
fid
!=
tSmaH
.
dFile
.
fid
)
{
if
(
tSmaH
.
dFile
.
fid
!=
TSDB_IVLD_FID
)
{
tsdbSmaEndCommit
(
pEnv
);
tsdbCloseDBF
(
&
tSmaH
.
dFile
);
}
tsdbSetTSmaDataFile
(
&
tSmaH
,
indexUid
,
fid
);
...
...
@@ -1020,12 +1185,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
}
tsdbSmaBeginCommit
(
pEnv
);
}
if
(
tsdbInsertTSmaBlocks
(
&
tSmaH
,
&
smaKey
,
SMA_KEY_LEN
,
dataBuf
,
tlen
)
!=
0
)
{
if
(
tsdbInsertTSmaBlocks
(
&
tSmaH
,
&
smaKey
,
SMA_KEY_LEN
,
dataBuf
,
tlen
,
&
pEnv
->
txn
)
!=
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data blocks fail for index %"
PRIi64
", skey %"
PRIi64
", groupId %"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
skey
,
groupId
,
tstrerror
(
terrno
));
tsdbSmaEndCommit
(
pEnv
);
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
...
...
@@ -1044,9 +1211,10 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
printf
(
"
\n
"
);
}
}
tsdbSmaEndCommit
(
pEnv
);
// TODO: not commit for every insert
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1370,8 +1538,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid,
tsdbDebug
(
"vgId:%d get sma data from %s: smaKey %"
PRIx64
"-%"
PRIx64
", keyLen %d"
,
REPO_ID
(
pTsdb
),
tReadH
.
dFile
.
path
,
*
(
int64_t
*
)
smaKey
,
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
SMA_KEY_LEN
);
void
*
result
=
NULL
;
u
int32_t
valueSize
=
0
;
void
*
result
=
NULL
;
int32_t
valueSize
=
0
;
if
((
result
=
tsdbGetSmaDataByKey
(
&
tReadH
.
dFile
,
smaKey
,
SMA_KEY_LEN
,
&
valueSize
))
==
NULL
)
{
tsdbWarn
(
"vgId:%d get sma data failed from smaIndex %"
PRIi64
", smaKey %"
PRIx64
"-%"
PRIx64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
*
(
int64_t
*
)
smaKey
,
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
tstrerror
(
terrno
));
...
...
@@ -1422,7 +1590,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
return
-
1
;
}
tsdbDebug
(
"vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%"
PRIi64
,
REPO_ID
(
pTsdb
),
vCreateSmaReq
.
tSma
.
indexName
,
vCreateSmaReq
.
tSma
.
indexUid
);
vCreateSmaReq
.
tSma
.
indexUid
);
// record current timezone of server side
vCreateSmaReq
.
tSma
.
timezoneInt
=
tsTimezone
;
...
...
@@ -1464,7 +1632,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
return
-
1
;
}
tsdbTSmaSub
(
pTsdb
,
1
);
tsdbTSmaSub
(
pTsdb
,
1
);
// TODO: return directly or go on follow steps?
return
TSDB_CODE_SUCCESS
;
...
...
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
0 → 100644
浏览文件 @
05d6c309
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "vnodeInt.h"
int32_t
tsdbOpenDBEnv
(
TENV
**
ppEnv
,
const
char
*
path
)
{
int
ret
=
0
;
if
(
path
==
NULL
)
return
-
1
;
ret
=
tdbEnvOpen
(
path
,
4096
,
256
,
ppEnv
);
// use as param
if
(
ret
!=
0
)
{
tsdbError
(
"Failed to create tsdb db env, ret = %d"
,
ret
);
return
-
1
;
}
return
0
;
}
int32_t
tsdbCloseDBEnv
(
TENV
*
pEnv
)
{
return
tdbEnvClose
(
pEnv
);
}
static
inline
int
tsdbSmaKeyCmpr
(
const
void
*
arg1
,
int
len1
,
const
void
*
arg2
,
int
len2
)
{
const
SSmaKey
*
pKey1
=
(
const
SSmaKey
*
)
arg1
;
const
SSmaKey
*
pKey2
=
(
const
SSmaKey
*
)
arg2
;
ASSERT
(
len1
==
len2
&&
len1
==
sizeof
(
SSmaKey
));
if
(
pKey1
->
skey
<
pKey2
->
skey
)
{
return
-
1
;
}
else
if
(
pKey1
->
skey
>
pKey2
->
skey
)
{
return
1
;
}
if
(
pKey1
->
groupId
<
pKey2
->
groupId
)
{
return
-
1
;
}
else
if
(
pKey1
->
groupId
>
pKey2
->
groupId
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbOpenDBDb
(
TDB
**
ppDB
,
TENV
*
pEnv
,
const
char
*
pFName
)
{
int
ret
;
FKeyComparator
compFunc
;
// Create a database
compFunc
=
tsdbSmaKeyCmpr
;
ret
=
tdbDbOpen
(
pFName
,
TDB_VARIANT_LEN
,
TDB_VARIANT_LEN
,
compFunc
,
pEnv
,
ppDB
);
return
0
;
}
static
int32_t
tsdbCloseDBDb
(
TDB
*
pDB
)
{
return
tdbDbClose
(
pDB
);
}
int32_t
tsdbOpenDBF
(
TENV
*
pEnv
,
SDBFile
*
pDBF
)
{
// TEnv is shared by a group of SDBFile
if
(
!
pEnv
||
!
pDBF
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
// Open DBF
if
(
tsdbOpenDBDb
(
&
(
pDBF
->
pDB
),
pEnv
,
pDBF
->
path
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
tsdbCloseDBDb
(
pDBF
->
pDB
);
return
-
1
;
}
return
0
;
}
int32_t
tsdbCloseDBF
(
SDBFile
*
pDBF
)
{
int32_t
ret
=
0
;
if
(
pDBF
->
pDB
)
{
ret
=
tsdbCloseDBDb
(
pDBF
->
pDB
);
pDBF
->
pDB
=
NULL
;
}
taosMemoryFreeClear
(
pDBF
->
path
);
return
ret
;
}
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
)
{
int32_t
ret
;
ret
=
tdbDbInsert
(
pDBF
->
pDB
,
pKey
,
keyLen
,
pVal
,
valLen
,
txn
);
if
(
ret
<
0
)
{
tsdbError
(
"Failed to create insert sma data into db, ret = %d"
,
ret
);
return
-
1
;
}
return
0
;
}
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
const
void
*
pKey
,
int32_t
keyLen
,
int32_t
*
valLen
)
{
void
*
result
;
void
*
pVal
;
int
ret
;
ret
=
tdbDbGet
(
pDBF
->
pDB
,
pKey
,
keyLen
,
&
pVal
,
valLen
);
if
(
ret
<
0
)
{
tsdbError
(
"Failed to get sma data from db, ret = %d"
,
ret
);
return
NULL
;
}
ASSERT
(
*
valLen
>=
0
);
result
=
taosMemoryMalloc
(
*
valLen
);
if
(
result
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
// TODO: lock?
// TODO: Would the key/value be destoryed during return the data?
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
// automatically?
memcpy
(
result
,
pVal
,
*
valLen
);
return
result
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
05d6c309
...
...
@@ -19,7 +19,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
// blockDebugShowData(data);
//
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
tsdbInsertTSmaData
(((
SVnode
*
)
pVnode
)
->
pTsdb
,
smaId
,
(
const
char
*
)
data
);
}
void
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
...
...
@@ -232,9 +232,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
//
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
//
// TODO
//
}
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
05d6c309
...
...
@@ -210,7 +210,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
// get value by indexName
STSma
*
qSmaCfg
=
NULL
;
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid1
);
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid1
,
true
);
assert
(
qSmaCfg
!=
NULL
);
printf
(
"name1 = %s
\n
"
,
qSmaCfg
->
indexName
);
printf
(
"timezone1 = %"
PRIi8
"
\n
"
,
qSmaCfg
->
timezoneInt
);
...
...
@@ -221,7 +221,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
tdDestroyTSma
(
qSmaCfg
);
taosMemoryFreeClear
(
qSmaCfg
);
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid2
);
qSmaCfg
=
metaGetSmaInfoByIndex
(
pMeta
,
indexUid2
,
true
);
assert
(
qSmaCfg
!=
NULL
);
printf
(
"name2 = %s
\n
"
,
qSmaCfg
->
indexName
);
printf
(
"timezone2 = %"
PRIi8
"
\n
"
,
qSmaCfg
->
timezoneInt
);
...
...
@@ -233,11 +233,12 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
taosMemoryFreeClear
(
qSmaCfg
);
// get index name by table uid
#if 0
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
assert(pSmaCur != NULL);
uint32_t indexCnt = 0;
while (1) {
const
char
*
indexName
=
metaSmaCursorNext
(
pSmaCur
);
const char *indexName =
(const char *)
metaSmaCursorNext(pSmaCur);
if (indexName == NULL) {
break;
}
...
...
@@ -245,8 +246,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
++indexCnt;
}
EXPECT_EQ(indexCnt, nCntTSma);
metaCloseSmaCur
os
r
(
pSmaCur
);
metaCloseSmaCur
so
r(pSmaCur);
#endif
// get wrapper by table uid
STSmaWrapper
*
pSW
=
metaGetSmaInfoByTable
(
pMeta
,
tbUid
);
assert
(
pSW
!=
NULL
);
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
05d6c309
...
...
@@ -1196,21 +1196,22 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return
-
1
;
}
// TODO: vLen may be zero
pVal
=
TDB_REALLOC
(
*
ppVal
,
cd
.
vLen
);
if
(
pVal
==
NULL
)
{
TDB_FREE
(
pKey
);
return
-
1
;
}
*
ppKey
=
pKey
;
*
ppVal
=
pVal
;
*
kLen
=
cd
.
kLen
;
*
vLen
=
cd
.
vLen
;
memcpy
(
pKey
,
cd
.
pKey
,
cd
.
kLen
);
memcpy
(
pVal
,
cd
.
pVal
,
cd
.
vLen
);
if
(
ppVal
)
{
// TODO: vLen may be zero
pVal
=
TDB_REALLOC
(
*
ppVal
,
cd
.
vLen
);
if
(
pVal
==
NULL
)
{
TDB_FREE
(
pKey
);
return
-
1
;
}
*
ppVal
=
pVal
;
*
vLen
=
cd
.
vLen
;
memcpy
(
pVal
,
cd
.
pVal
,
cd
.
vLen
);
}
ret
=
tdbBtcMoveToNext
(
pBtc
);
if
(
ret
<
0
)
{
...
...
tests/script/jenkins/basic.txt
浏览文件 @
05d6c309
...
...
@@ -80,4 +80,7 @@
./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m
# --- sma
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
#======================b1-end===============
tests/script/tsim/sma/tsmaCreateInsertData.sim
0 → 100644
浏览文件 @
05d6c309
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d1
sql show databases
if $rows != 2 then
return -1
endi
print $data00 $data01 $data02
sql use d1
print =============== create super table, include column type for count/sum/min/max/first
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags(1000)
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data, mode1: one row one table in sql
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
print =============== create sma index from super table
sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) interval(5m,10s) sliding(2m)
print $data00 $data01 $data02 $data03
print =============== trigger stream to execute sma aggr task and insert sma data into sma store
sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录