Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f5758fbe
T
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
f5758fbe
编写于
3月 14, 2022
作者:
C
Cary Xu
提交者:
GitHub
3月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10724 from taosdata/feature/TD-11463-3.0
Feature/td 11463 3.0
上级
4189328d
7d070abb
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
861 addition
and
173 deletion
+861
-173
include/common/taosdef.h
include/common/taosdef.h
+6
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+7
-0
source/dnode/vnode/src/inc/tsdbDBDef.h
source/dnode/vnode/src/inc/tsdbDBDef.h
+44
-0
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+13
-4
source/dnode/vnode/src/inc/tsdbFile.h
source/dnode/vnode/src/inc/tsdbFile.h
+7
-5
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+41
-12
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+17
-16
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
+159
-0
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+28
-1
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+415
-121
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+12
-1
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+110
-13
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/taosdef.h
浏览文件 @
f5758fbe
...
...
@@ -61,6 +61,12 @@ typedef enum {
TSDB_SMA_STAT_EXPIRED
=
1
,
// not ready or expired
}
ETsdbSmaStat
;
typedef
enum
{
TSDB_SMA_TYPE_BLOCK
=
0
,
// Block-wise SMA
TSDB_SMA_TYPE_TIME_RANGE
=
1
,
// Time-range-wise SMA
TSDB_SMA_TYPE_ROLLUP
=
2
,
// Rollup SMA
}
ETsdbSmaType
;
extern
char
*
qtypeStr
[];
#define TSDB_PORT_HTTP 11
...
...
include/util/taoserror.h
浏览文件 @
f5758fbe
...
...
@@ -353,6 +353,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
f5758fbe
...
...
@@ -95,6 +95,7 @@ int tsdbCommit(STsdb *pTsdb);
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
);
/**
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
...
...
@@ -105,6 +106,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySkey
,
int32_t
nMaxResult
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
...
...
source/dnode/vnode/src/inc/tsdbDBDef.h
0 → 100644
浏览文件 @
f5758fbe
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_DB_DEF_H_
#define _TD_TSDB_DB_DEF_H_
#include "db.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SDBFile
SDBFile
;
typedef
DB_ENV
*
TDBEnv
;
struct
SDBFile
{
DB
*
pDB
;
char
*
path
;
};
int32_t
tsdbOpenDBF
(
TDBEnv
pEnv
,
SDBFile
*
pDBF
);
void
tsdbCloseDBF
(
SDBFile
*
pDBF
);
int32_t
tsdbOpenBDBEnv
(
DB_ENV
**
ppEnv
,
const
char
*
path
);
void
tsdbCloseBDBEnv
(
DB_ENV
*
pEnv
);
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
void
*
data
,
uint32_t
dataSize
);
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
uint32_t
*
valueSize
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_DB_DEF_H_*/
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
f5758fbe
...
...
@@ -27,6 +27,7 @@
#include "ttime.h"
#include "tsdb.h"
#include "tsdbDBDef.h"
#include "tsdbCommit.h"
#include "tsdbFS.h"
#include "tsdbFile.h"
...
...
@@ -37,12 +38,15 @@
#include "tsdbReadImpl.h"
#include "tsdbSma.h"
#ifdef __cplusplus
extern
"C"
{
#endif
struct
STsdb
{
int32_t
vgId
;
bool
repoLocked
;
pthread_mutex_t
mutex
;
char
*
path
;
STsdbCfg
config
;
STsdbMemTable
*
mem
;
...
...
@@ -52,12 +56,17 @@ struct STsdb {
STsdbFS
*
fs
;
SMeta
*
pMeta
;
STfs
*
pTfs
;
SSmaStat
*
pSmaStat
;
SSmaEnv
*
pTSmaEnv
;
SSmaEnv
*
pRSmaEnv
;
};
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (r)->fs
#define IS_REPO_LOCKED(r) (r)->repoLocked
int
tsdbLockRepo
(
STsdb
*
pTsdb
);
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
);
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int32_t
version
)
{
return
pTable
->
pSchema
;
...
...
source/dnode/vnode/src/inc/tsdbFile.h
浏览文件 @
f5758fbe
...
...
@@ -329,21 +329,23 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
// =============== SDFileSet
typedef
struct
{
int
fid
;
int8_t
state
;
// -128~127
uint8_t
ver
;
// 0~255, DFileSet version
int8_t
state
;
// -128~127
uint8_t
ver
;
// 0~255, DFileSet version
uint16_t
reserve
;
SDFile
files
[
TSDB_FILE_MAX
];
}
SDFileSet
;
typedef
struct
{
int
fid
;
int8_t
state
;
uint8_t
ver
;
int
fid
;
int8_t
state
;
uint8_t
ver
;
uint16_t
reserve
;
#if 0
SDFInfo info;
#endif
STfsFile
f
;
TdFilePtr
pFile
;
}
SSFile
;
// files split by days with fid
#define TSDB_LATEST_FSET_VER 0
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
f5758fbe
...
...
@@ -17,27 +17,29 @@
#define _TD_TSDB_SMA_H_
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaEnv
SSmaEnv
;
// insert/update interface
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
struct
SSmaEnv
{
pthread_rwlock_t
lock
;
TDBEnv
dbEnv
;
char
*
path
;
SSmaStat
*
pStat
;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
);
// management interface
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tsdbFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI64
(
pData
,
tableUid
);
...
...
@@ -46,4 +48,31 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId,
return
len
;
}
static
FORCE_INLINE
int
tsdbRLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
pthread_rwlock_rdlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int
tsdbWLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
pthread_rwlock_wrlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int
tsdbUnLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
pthread_rwlock_unlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
#endif
/* _TD_TSDB_SMA_H_ */
\ No newline at end of file
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
f5758fbe
...
...
@@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
void
*
pBuf
=
NULL
,
*
qBuf
=
NULL
;
DBT
key1
=
{
0
},
value1
=
{
0
};
{
// save sma info
int32_t
len
=
tEncodeTSma
(
NULL
,
pSmaCfg
);
pBuf
=
calloc
(
len
,
1
);
if
(
pBuf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
// save sma info
int32_t
len
=
tEncodeTSma
(
NULL
,
pSmaCfg
);
pBuf
=
calloc
(
len
,
1
);
if
(
pBuf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
key1
.
data
=
(
void
*
)
&
pSmaCfg
->
indexUid
;
key1
.
size
=
sizeof
(
pSmaCfg
->
indexUid
);
key1
.
data
=
(
void
*
)
&
pSmaCfg
->
indexUid
;
key1
.
size
=
sizeof
(
pSmaCfg
->
indexUid
);
qBuf
=
pBuf
;
tEncodeTSma
(
&
qBuf
,
pSmaCfg
);
qBuf
=
pBuf
;
tEncodeTSma
(
&
qBuf
,
pSmaCfg
);
value1
.
data
=
pBuf
;
value1
.
size
=
POINTER_DISTANCE
(
qBuf
,
pBuf
);
value1
.
app_data
=
pSmaCfg
;
}
value1
.
data
=
pBuf
;
value1
.
size
=
POINTER_DISTANCE
(
qBuf
,
pBuf
);
value1
.
app_data
=
pSmaCfg
;
metaDBWLock
(
pMeta
->
pDB
);
pMeta
->
pDB
->
pSmaDB
->
put
(
pMeta
->
pDB
->
pSmaDB
,
NULL
,
&
key1
,
&
value1
,
0
);
metaDBULock
(
pMeta
->
pDB
);
// release
tfree
(
pBuf
);
return
0
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
浏览文件 @
f5758fbe
...
...
@@ -12,3 +12,162 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "taoserror.h"
#include "tcoding.h"
#include "thash.h"
#include "tsdbDBDef.h"
#include "tsdbLog.h"
#define IMPL_WITH_LOCK 1
static
int
tsdbOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
bool
isDup
);
static
void
tsdbCloseBDBDb
(
DB
*
pDB
);
#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code))
int32_t
tsdbOpenDBF
(
TDBEnv
pEnv
,
SDBFile
*
pDBF
)
{
// TDBEnv is shared by a group of SDBFile
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
// Open DBF
if
(
tsdbOpenBDBDb
(
&
(
pDBF
->
pDB
),
pEnv
,
pDBF
->
path
,
false
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
tsdbCloseBDBDb
(
pDBF
->
pDB
);
return
-
1
;
}
return
0
;
}
void
tsdbCloseDBF
(
SDBFile
*
pDBF
)
{
if
(
pDBF
->
pDB
)
{
tsdbCloseBDBDb
(
pDBF
->
pDB
);
pDBF
->
pDB
=
NULL
;
}
tfree
(
pDBF
->
path
);
}
int32_t
tsdbOpenBDBEnv
(
DB_ENV
**
ppEnv
,
const
char
*
path
)
{
int
ret
=
0
;
DB_ENV
*
pEnv
=
NULL
;
if
(
path
==
NULL
)
return
0
;
ret
=
db_env_create
(
&
pEnv
,
0
);
if
(
ret
!=
0
)
{
BDB_PERR
(
"Failed to create tsdb env"
,
ret
);
return
-
1
;
}
ret
=
pEnv
->
open
(
pEnv
,
path
,
DB_CREATE
|
DB_INIT_CDB
|
DB_INIT_MPOOL
,
0
);
if
(
ret
!=
0
)
{
// BDB_PERR("Failed to open tsdb env", ret);
tsdbWarn
(
"Failed to open tsdb env for path %s since %d"
,
path
?
path
:
"NULL"
,
ret
);
return
-
1
;
}
*
ppEnv
=
pEnv
;
return
0
;
}
void
tsdbCloseBDBEnv
(
DB_ENV
*
pEnv
)
{
if
(
pEnv
)
{
pEnv
->
close
(
pEnv
,
0
);
}
}
static
int
tsdbOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
bool
isDup
)
{
int
ret
;
DB
*
pDB
;
ret
=
db_create
(
&
(
pDB
),
pEnv
,
0
);
if
(
ret
!=
0
)
{
BDB_PERR
(
"Failed to create DBP"
,
ret
);
return
-
1
;
}
if
(
isDup
)
{
ret
=
pDB
->
set_flags
(
pDB
,
DB_DUPSORT
);
if
(
ret
!=
0
)
{
BDB_PERR
(
"Failed to set DB flags"
,
ret
);
return
-
1
;
}
}
ret
=
pDB
->
open
(
pDB
,
NULL
,
pFName
,
NULL
,
DB_BTREE
,
DB_CREATE
,
0
);
if
(
ret
)
{
BDB_PERR
(
"Failed to open DBF"
,
ret
);
return
-
1
;
}
*
ppDB
=
pDB
;
return
0
;
}
static
void
tsdbCloseBDBDb
(
DB
*
pDB
)
{
if
(
pDB
)
{
pDB
->
close
(
pDB
,
0
);
}
}
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
void
*
data
,
uint32_t
dataSize
)
{
int
ret
;
DBT
key1
=
{
0
},
value1
=
{
0
};
key1
.
data
=
key
;
key1
.
size
=
keySize
;
value1
.
data
=
data
;
value1
.
size
=
dataSize
;
// TODO: lock
ret
=
pDBF
->
pDB
->
put
(
pDBF
->
pDB
,
NULL
,
&
key1
,
&
value1
,
0
);
if
(
ret
)
{
BDB_PERR
(
"Failed to put data to DBF"
,
ret
);
// TODO: unlock
return
-
1
;
}
// TODO: unlock
return
0
;
}
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
uint32_t
*
valueSize
)
{
void
*
result
=
NULL
;
DBT
key1
=
{
0
};
DBT
value1
=
{
0
};
int
ret
;
// Set key/value
key1
.
data
=
key
;
key1
.
size
=
keySize
;
// Query
// TODO: lock
ret
=
pDBF
->
pDB
->
get
(
pDBF
->
pDB
,
NULL
,
&
key1
,
&
value1
,
0
);
// TODO: unlock
if
(
ret
!=
0
)
{
return
NULL
;
}
result
=
calloc
(
1
,
value1
.
size
);
if
(
result
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
*
valueSize
=
value1
.
size
;
memcpy
(
result
,
value1
.
data
,
value1
.
size
);
return
result
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
f5758fbe
...
...
@@ -80,6 +80,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
pTsdb
->
pmaf
=
pMAF
;
pTsdb
->
pMeta
=
pMeta
;
pTsdb
->
pTfs
=
pTfs
;
pTsdb
->
pTSmaEnv
=
NULL
;
pTsdb
->
pRSmaEnv
=
NULL
;
pTsdb
->
fs
=
tsdbNewFS
(
pTsdbCfg
);
...
...
@@ -88,8 +90,9 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
static
void
tsdbFree
(
STsdb
*
pTsdb
)
{
if
(
pTsdb
)
{
tsdbFreeSmaEnv
(
pTsdb
->
pRSmaEnv
);
tsdbFreeSmaEnv
(
pTsdb
->
pTSmaEnv
);
tsdbFreeFS
(
pTsdb
->
fs
);
tsdbDestroySmaState
(
pTsdb
->
pSmaStat
);
tfree
(
pTsdb
->
path
);
free
(
pTsdb
);
}
...
...
@@ -105,6 +108,30 @@ static void tsdbCloseImpl(STsdb *pTsdb) {
tsdbCloseFS
(
pTsdb
);
// TODO
}
int
tsdbLockRepo
(
STsdb
*
pTsdb
)
{
int
code
=
pthread_mutex_lock
(
&
pTsdb
->
mutex
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d failed to lock tsdb since %s"
,
REPO_ID
(
pTsdb
),
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
pTsdb
->
repoLocked
=
true
;
return
0
;
}
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
)
{
ASSERT
(
IS_REPO_LOCKED
(
pTsdb
));
pTsdb
->
repoLocked
=
false
;
int
code
=
pthread_mutex_unlock
(
&
pTsdb
->
mutex
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d failed to unlock tsdb since %s"
,
REPO_ID
(
pTsdb
),
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
#if 0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
f5758fbe
...
...
@@ -15,7 +15,9 @@
#include "tsdbDef.h"
#undef SMA_PRINT_DEBUG_LOG
#define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_TSDB_TIMES 30
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
...
...
@@ -23,7 +25,7 @@
#define SMA_STATE_ITEM_HASH_SLOT 32
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID
123456
// TODO: just for test
#define SMA_TEST_INDEX_UID
2000000001
// TODO: just for test
typedef
enum
{
SMA_STORAGE_LEVEL_TSDB
=
0
,
// use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat
SMA_STORAGE_LEVEL_DFILESET
=
1
// use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat
...
...
@@ -31,23 +33,22 @@ typedef enum {
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
int32_t
interval
;
// interval with the precision of DB
// TODO
SDBFile
dFile
;
int32_t
interval
;
// interval with the precision of DB
}
STSmaWriteH
;
typedef
struct
{
int32_t
iter
;
int32_t
fid
;
}
SmaFsIter
;
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
SDBFile
dFile
;
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
int8_t
storageLevel
;
int8_t
days
;
SmaFsIter
smaFsIter
;
// TODO
}
STSmaReadH
;
typedef
struct
{
...
...
@@ -68,18 +69,117 @@ struct SSmaStat {
};
// declaration of static functions
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdbJudgeStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
);
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySkey
,
int32_t
nMaxResult
);
static
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
);
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
const
char
*
path
,
SSmaEnv
**
pEnv
);
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
);
static
void
tsdbDestroyTSmaWriteH
(
STSmaWriteH
*
pSmaH
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbGetSmaStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
uint32_t
keyLen
,
void
*
pData
,
uint32_t
dataLen
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
);
static
int32_t
tsdbGetTSmaDays
(
STsdb
*
pTsdb
,
int64_t
interval
,
int32_t
storageLevel
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
storageLevel
,
int32_t
fid
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pSmaH
,
TSKEY
skey
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
TSKEY
*
queryKey
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STimeWindow
*
queryWin
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STimeWindow
*
queryWin
);
static
SSmaEnv
*
tsdbNewSmaEnv
(
const
STsdb
*
pTsdb
,
const
char
*
path
)
{
SSmaEnv
*
pEnv
=
NULL
;
pEnv
=
(
SSmaEnv
*
)
calloc
(
1
,
sizeof
(
SSmaEnv
));
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
int
code
=
pthread_rwlock_init
(
&
(
pEnv
->
lock
),
NULL
);
if
(
code
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
free
(
pEnv
);
return
NULL
;
}
ASSERT
(
path
&&
(
strlen
(
path
)
>
0
));
pEnv
->
path
=
strdup
(
path
);
if
(
pEnv
->
path
==
NULL
)
{
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
}
if
(
tsdbInitSmaStat
(
&
pEnv
->
pStat
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
}
if
(
tsdbOpenBDBEnv
(
&
pEnv
->
dbEnv
,
pEnv
->
path
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbFreeSmaEnv
(
pEnv
);
return
NULL
;
}
return
pEnv
;
}
static
int32_t
tsdbInitSmaEnv
(
STsdb
*
pTsdb
,
const
char
*
path
,
SSmaEnv
**
pEnv
)
{
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
if
(
pEnv
&&
*
pEnv
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
tsdbLockRepo
(
pTsdb
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
if
(
*
pEnv
==
NULL
)
{
if
((
*
pEnv
=
tsdbNewSmaEnv
(
pTsdb
,
path
))
==
NULL
)
{
tsdbUnlockRepo
(
pTsdb
);
return
TSDB_CODE_FAILED
;
}
}
if
(
tsdbUnlockRepo
(
pTsdb
)
!=
0
)
{
tsdbFreeSmaEnv
(
*
pEnv
);
return
TSDB_CODE_FAILED
;
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaEnv
* @return int32_t
*/
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
if
(
pSmaEnv
)
{
tsdbDestroySmaState
(
pSmaEnv
->
pStat
);
tfree
(
pSmaEnv
->
pStat
);
tfree
(
pSmaEnv
->
path
);
pthread_rwlock_destroy
(
&
(
pSmaEnv
->
lock
));
tsdbCloseBDBEnv
(
pSmaEnv
->
dbEnv
);
}
}
void
*
tsdbFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
tsdbDestroySmaEnv
(
pSmaEnv
);
tfree
(
pSmaEnv
);
return
NULL
;
}
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
)
{
ASSERT
(
pSmaStat
!=
NULL
);
...
...
@@ -125,6 +225,12 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
return
pItem
;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
* @param pSmaStat
* @return int32_t
*/
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
)
{
if
(
pSmaStat
)
{
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
...
...
@@ -135,7 +241,6 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
item
=
taosHashIterate
(
pSmaStat
->
smaStatItems
,
item
);
}
taosHashCleanup
(
pSmaStat
->
smaStatItems
);
free
(
pSmaStat
);
}
}
...
...
@@ -143,22 +248,35 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
* @brief Update expired window according to msg from stream computing module.
*
* @param pTsdb
* @param smaType ETsdbSmaType
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
char
*
msg
)
{
if
(
msg
==
NULL
)
{
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
SSmaEnv
*
pEnv
=
NULL
;
if
(
!
msg
||
!
pTsdb
->
pMeta
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
// lazy mode
if
(
tsdbInitSma
Stat
(
&
pTsdb
->
pSmaStat
)
!=
TSDB_CODE_SUCCESS
)
{
char
smaPath
[
TSDB_FILENAME_LEN
]
=
"/proj/.sma/"
;
if
(
tsdbInitSma
Env
(
pTsdb
,
smaPath
,
&
pEnv
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
pTsdb
->
pTSmaEnv
=
pEnv
;
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
pTsdb
->
pRSmaEnv
=
pEnv
;
}
else
{
ASSERT
(
0
);
}
// TODO: decode the msg => start
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
const
char
*
indexName
=
SMA_TEST_INDEX_NAME
;
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
//
const char * indexName = SMA_TEST_INDEX_NAME;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
int64_t
now
=
taosGetTimestampMs
();
...
...
@@ -167,9 +285,9 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
}
// TODO: decode the msg <= end
SHashObj
*
pItemsHash
=
pTsdb
->
pSmaStat
->
smaStatItems
;
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
)
;
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pItemsHash
,
indexName
,
strlen
(
indexName
));
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_EXPIRED
);
// TODO use the real state
if
(
pItem
==
NULL
)
{
...
...
@@ -181,20 +299,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
// cache smaMeta
STSma
*
pSma
=
metaGetSmaInfoByIndex
(
pTsdb
->
pMeta
,
indexUid
);
if
(
pSma
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
;
taosHashCleanup
(
pItem
->
expiredWindows
);
free
(
pItem
);
tsdbWarn
(
"vgId:%d update expired window failed for smaIndex %"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
pItem
->
pSma
=
pSma
;
// TODO: change indexName to indexUid
if
(
taosHashPut
(
pItemsHash
,
indexName
,
strnlen
(
indexName
,
TSDB_INDEX_NAME_LEN
),
&
pItem
,
sizeof
(
pItem
))
!=
0
)
{
if
(
taosHashPut
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
),
&
pItem
,
sizeof
(
pItem
))
!=
0
)
{
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup
(
pItem
->
expiredWindows
);
free
(
pItem
);
return
TSDB_CODE_FAILED
;
}
}
#if 0
SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size1 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1);
#endif
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
...
...
@@ -207,21 +333,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
// windows failed to put into hash table.
taosHashCleanup
(
pItem
->
expiredWindows
);
tfree
(
pItem
->
pSma
);
taosHashRemove
(
pItemsHash
,
indexName
,
sizeof
(
indexName
));
taosHashRemove
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
return
TSDB_CODE_FAILED
;
}
}
#if 0
SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
int size2 = taosHashGetSize(pItem1->expiredWindows);
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2);
#endif
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbResetExpiredWindow
(
S
Tsdb
*
pTsdb
,
int64_t
indexUid
,
TSKEY
skey
)
{
static
int32_t
tsdbResetExpiredWindow
(
S
SmaStat
*
pStat
,
int64_t
indexUid
,
TSKEY
skey
)
{
SSmaStatItem
*
pItem
=
NULL
;
if
(
pTsdb
->
pSmaStat
&&
pTsdb
->
pSmaStat
->
smaStatItems
)
{
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pTsdb
->
pSmaStat
->
smaStatItems
,
&
indexUid
,
sizeof
(
indexUid
));
// TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode?
if
(
pStat
&&
pStat
->
smaStatItems
)
{
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pStat
->
smaStatItems
,
&
indexUid
,
sizeof
(
indexUid
));
}
#if 0
if (pItem != NULL) {
// TODO: reset time window for the sma data blocks
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
...
...
@@ -231,6 +364,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey
} else {
// error handling
}
#endif
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -241,7 +375,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey
* @param intervalUnit
* @return int32_t
*/
static
int32_t
tsdb
Judge
StorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
)
{
static
int32_t
tsdb
GetSma
StorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
)
{
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_HOUR
:
...
...
@@ -281,18 +415,35 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) {
}
/**
* @brief Insert TSma data blocks to B+Tree
* @brief Insert TSma data blocks to
DB File build by
B+Tree
*
* @param
bTree
* @param
pSmaH
* @param smaKey
* @param keyLen
* @param pData
* @param dataLen
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
)
{
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
uint32_t
keyLen
,
void
*
pData
,
uint32_t
dataLen
)
{
SDBFile
*
pDBFile
=
&
pSmaH
->
dFile
;
// TODO: insert sma data blocks into B+Tree
tsdbDebug
(
"insert sma data blocks into B+Tree: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", dataLen %d"
,
*
(
uint64_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
tsdbDebug
(
"vgId:%d insert sma data blocks into %s: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", dataLen %d"
,
REPO_ID
(
pSmaH
->
pTsdb
),
pDBFile
->
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
if
(
tsdbSaveSmaToDB
(
pDBFile
,
smaKey
,
keyLen
,
pData
,
dataLen
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
#ifdef SMA_PRINT_DEBUG_LOG
uint32_t
valueSize
=
0
;
void
*
data
=
tsdbGetSmaDataByKey
(
pDBFile
,
smaKey
,
keyLen
,
&
valueSize
);
ASSERT
(
data
!=
NULL
);
for
(
uint32_t
v
=
0
;
v
<
valueSize
;
v
+=
8
)
{
tsdbWarn
(
"vgId:%d sma data - val[%d] is %"
PRIi64
,
REPO_ID
(
pSmaH
->
pTsdb
),
v
,
*
(
int64_t
*
)
POINTER_SHIFT
(
data
,
v
));
}
#endif
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -324,41 +475,41 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
}
}
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_MILLISEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
switch
(
precision
)
{
case
TSDB_TIME_PRECISION_MILLI
:
if
(
TD_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
return
interval
/
1e3
;
}
else
if
(
TD_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
return
interval
/
1e6
;
}
else
{
return
interval
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e3
;
}
else
{
// nano second
return
interval
*
1e6
;
}
break
;
case
TD_TIME_UNIT_MICROSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e3
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
case
TSDB_TIME_PRECISION_MICRO
:
if
(
TD_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
return
interval
;
}
else
{
// nano second
}
else
if
(
TD_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
return
interval
/
1e3
;
}
else
{
return
interval
*
1e3
;
}
break
;
case
TD_TIME_UNIT_NANOSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e6
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
/
1e3
;
}
else
{
// nano second
case
TSDB_TIME_PRECISION_NANO
:
if
(
TD_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
return
interval
*
1e3
;
}
else
if
(
TD_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
return
interval
;
}
else
{
return
interval
*
1e6
;
}
break
;
default:
if
(
T
SDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
*
1e3
;
}
else
if
(
T
SDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e6
;
}
else
{
// nano second
return
interval
*
1e9
;
default:
// ms
if
(
T
D_TIME_UNIT_MICROSEC
==
intervalUnit
)
{
// us
return
interval
/
1e3
;
}
else
if
(
T
D_TIME_UNIT_NANOSEC
==
intervalUnit
)
{
// nano second
return
interval
/
1e6
;
}
else
{
return
interval
;
}
break
;
}
...
...
@@ -381,8 +532,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
// TODO: check the data integrity
void
*
bTree
=
pSmaH
->
pDFile
;
int32_t
len
=
0
;
while
(
true
)
{
if
(
len
>=
pData
->
dataLen
)
{
...
...
@@ -405,7 +554,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif
tsdbEncodeTSmaKey
(
pTbData
->
tableUid
,
pColData
->
colId
,
pData
->
skey
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
bTree
,
smaKey
,
pColData
->
data
,
pColData
->
blockSize
)
<
0
)
{
if
(
tsdbInsertTSmaBlocks
(
pSmaH
,
smaKey
,
SMA_KEY_LEN
,
pColData
->
data
,
pColData
->
blockSize
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
tbLen
+=
(
sizeof
(
STSmaColData
)
+
pColData
->
blockSize
);
...
...
@@ -419,15 +568,43 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
pData
->
interval
,
pData
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
storageLevel
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"tSma_interval_file_name"
;
static
void
tsdbDestroyTSmaWriteH
(
STSmaWriteH
*
pSmaH
)
{
if
(
pSmaH
)
{
tsdbCloseDBF
(
&
pSmaH
->
dFile
);
}
}
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
storageLevel
,
int32_t
fid
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
ASSERT
(
pSmaH
->
dFile
.
path
==
NULL
&&
pSmaH
->
dFile
.
pDB
==
NULL
);
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tSmaFile
,
TSDB_FILENAME_LEN
,
"v%df%d.tsma"
,
REPO_ID
(
pTsdb
),
fid
);
pSmaH
->
dFile
.
path
=
strdup
(
tSmaFile
);
return
TSDB_CODE_SUCCESS
;
}
}
/**
* @brief
*
* @param pTsdb
* @param interval Interval calculated by DB's precision
* @param storageLevel
* @return int32_t
*/
static
int32_t
tsdbGetTSmaDays
(
STsdb
*
pTsdb
,
int64_t
interval
,
int32_t
storageLevel
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
int32_t
daysPerFile
=
pCfg
->
daysPerFile
;
if
(
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
)
{
int32_t
days
=
SMA_STORAGE_TSDB_TIMES
*
(
interval
/
tsTickPerDay
[
pCfg
->
precision
]);
daysPerFile
=
days
>
SMA_STORAGE_TSDB_DAYS
?
days
:
SMA_STORAGE_TSDB_DAYS
;
}
return
daysPerFile
;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
...
...
@@ -441,51 +618,72 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaDataWrapper
*
pData
=
(
STSmaDataWrapper
*
)
msg
;
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
);
if
(
!
pTsdb
->
pTSmaEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
}
if
(
pData
->
dataLen
<=
0
)
{
TASSERT
(
0
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
return
TSDB_CODE_FAILED
;
}
// Step 1: Judge the storage level
int32_t
storageLevel
=
tsdbJudgeStorageLevel
(
pData
->
interval
,
pData
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
pCfg
->
daysPerFile
;
STSmaWriteH
tSmaH
=
{
0
};
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
if
(
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
// Step 1: Judge the storage level and days
int32_t
storageLevel
=
tsdbGetSmaStorageLevel
(
pData
->
interval
,
pData
->
intervalUnit
);
int32_t
daysPerFile
=
tsdbGetTSmaDays
(
pTsdb
,
tSmaH
.
interval
,
storageLevel
);
int32_t
fid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
skey
,
daysPerFile
,
pCfg
->
precision
));
// Save all the TSma data to one file
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile
(
&
tSmaH
,
pData
,
storageLevel
,
fid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
);
if
(
tsdbOpenDBF
(
pTsdb
->
pTSmaEnv
->
dbEnv
,
&
tSmaH
.
dFile
)
!=
0
)
{
tsdbWarn
(
"vgId:%d open DB file %s failed since %s"
,
REPO_ID
(
pTsdb
),
tSmaH
.
dFile
.
path
?
tSmaH
.
dFile
.
path
:
"path is NULL"
,
tstrerror
(
terrno
));
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
return
TSDB_CODE_FAILED
;
}
if
(
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
)
!=
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data section failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
return
TSDB_CODE_FAILED
;
}
// TODO:tsdbEndTSmaCommit();
// reset the SSmaStat
tsdbResetExpiredWindow
(
pTsdb
,
pData
->
indexUid
,
pData
->
skey
);
//
Step 3:
reset the SSmaStat
tsdbResetExpiredWindow
(
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
)
,
pData
->
indexUid
,
pData
->
skey
);
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"rSma_interval_file_name"
;
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tSmaFile
,
TSDB_FILENAME_LEN
,
"v%df%d.rsma"
,
REPO_ID
(
pTsdb
),
fid
);
pSmaH
->
dFile
.
path
=
strdup
(
tSmaFile
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaDataWrapper
*
pData
=
(
STSmaDataWrapper
*
)
msg
;
STSmaWriteH
tSmaH
=
{
0
};
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
);
...
...
@@ -496,7 +694,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
}
// Step 1: Judge the storage level
int32_t
storageLevel
=
tsdb
Judge
StorageLevel
(
pData
->
interval
,
pData
->
intervalUnit
);
int32_t
storageLevel
=
tsdb
GetSma
StorageLevel
(
pData
->
interval
,
pData
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
pCfg
->
daysPerFile
;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
...
...
@@ -507,45 +705,46 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile
(
&
tSmaH
,
pData
,
storageLevel
,
fid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
);
// TODO:tsdbEndTSmaCommit();
// reset the SSmaStat
tsdbResetExpiredWindow
(
pTsdb
,
pData
->
indexUid
,
pData
->
skey
);
tsdbResetExpiredWindow
(
SMA_ENV_STAT
(
pTsdb
->
pRSmaEnv
)
,
pData
->
indexUid
,
pData
->
skey
);
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief
Init of tSma ReadH
* @brief
*
* @param pSmaH
* @param pTsdb
* @param
param
* @param
pData
* @param
interval
* @param
intervalUnit
* @return int32_t
*/
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
)
{
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
int64_t
interval
,
int8_t
intervalUnit
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
pData
->
interval
,
pData
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
// pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t);
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
interval
,
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
storageLevel
=
tsdbGetSmaStorageLevel
(
interval
,
intervalUnit
);
pSmaH
->
days
=
tsdbGetTSmaDays
(
pTsdb
,
pSmaH
->
interval
,
pSmaH
->
storageLevel
);
}
/**
* @brief Init of tSma FS
*
* @param pReadH
* @param param
* @param queryWin
* @param skey
* @return int32_t
*/
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
p
ReadH
,
STimeWindow
*
queryWin
)
{
int32_t
storageLevel
=
0
;
//tsdbJudgeStorageLevel(param->interval, param->intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
REPO_CFG
(
pReadH
->
pTsdb
)
->
daysPerFile
;
p
ReadH
->
storageLevel
=
storageLevel
;
p
ReadH
->
days
=
daysPerFile
;
p
ReadH
->
smaFsIter
.
iter
=
0
;
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
p
SmaH
,
TSKEY
skey
)
{
int32_t
fid
=
(
int32_t
)(
TSDB_KEY_FID
(
skey
,
pSmaH
->
days
,
REPO_CFG
(
pSmaH
->
pTsdb
)
->
precision
)
);
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tSmaFile
,
TSDB_FILENAME_LEN
,
"v%df%d.tsma"
,
REPO_ID
(
pSmaH
->
pTsdb
),
fid
)
;
p
SmaH
->
dFile
.
path
=
strdup
(
tSmaFile
)
;
p
SmaH
->
smaFsIter
.
iter
=
0
;
p
SmaH
->
smaFsIter
.
fid
=
fid
;
}
/**
...
...
@@ -557,17 +756,18 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
* @return true
* @return false
*/
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STimeWindow
*
queryWin
)
{
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
TSKEY
*
queryKey
)
{
SArray
*
smaFs
=
pReadH
->
pTsdb
->
fs
->
cstatus
->
sf
;
int32_t
nSmaFs
=
taosArrayGetSize
(
smaFs
);
pReadH
->
pDFile
=
NULL
;
tsdbCloseDBF
(
&
pReadH
->
dFile
)
;
#if 0
while (pReadH->smaFsIter.iter < nSmaFs) {
void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
if (pSmaFile) { // match(indexName, queryWindow)
// TODO: select the file by index_name ...
pReadH
->
pD
File
=
pSmaFile
;
pReadH->
d
File = pSmaFile;
++pReadH->smaFsIter.iter;
break;
}
...
...
@@ -578,41 +778,83 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
return true;
}
#endif
return
false
;
}
/**
* @brief
Return the data between queryWin and fill the pData.
* @brief
*
* @param pTsdb
* @param param
* @param pTsdb Return the data between queryWin and fill the pData.
* @param pData
* @param queryWin
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param pQuerySKey
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t
*/
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
)
{
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pTsdb
->
pSmaStat
->
smaStatItems
,
&
pData
->
indexUid
,
sizeof
(
pData
->
indexUid
));
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySkey
,
int32_t
nMaxResult
)
{
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
SMA_ENV_STAT_ITEMS
(
pTsdb
->
pTSmaEnv
),
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
// mark all window as expired and notify query module to query raw TS data.
return
TSDB_CODE_SUCCESS
;
}
int32_t
nQueryWin
=
0
;
#if 0
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY
thisWindow
=
n
;
if
(
taosHashGet
(
pItem
->
expiredWindows
,
&
thisWindow
,
sizeof
(
thisWindow
))
!=
NULL
)
{
TSKEY
skey = taosArrayGet(pQuerySKey, n)
;
if (taosHashGet(pItem->expiredWindows, &
skey, sizeof(TSKEY
)) != NULL) {
// TODO: mark this window as expired.
}
}
#endif
#if 0
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired.
}
#endif
STSmaReadH
tReadH
=
{
0
};
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
pData
);
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
interval
,
intervalUnit
);
tsdbCloseDBF
(
&
tReadH
.
dFile
);
tsdbInitTSmaFile
(
&
tReadH
,
queryWin
);
tsdbInitTSmaFile
(
&
tReadH
,
querySkey
);
if
(
tsdbOpenDBF
(
SMA_ENV_ENV
(
pTsdb
->
pTSmaEnv
),
&
tReadH
.
dFile
)
!=
0
)
{
tsdbWarn
(
"vgId:%d open DBF %s failed since %s"
,
REPO_ID
(
pTsdb
),
tReadH
.
dFile
.
path
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
tableUid
,
colId
,
querySkey
,
(
void
**
)
&
pSmaKey
);
tsdbDebug
(
"vgId:%d get sma data from %s: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", keyLen %d"
,
REPO_ID
(
pTsdb
),
tReadH
.
dFile
.
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
SMA_KEY_LEN
);
void
*
result
=
NULL
;
uint32_t
valueSize
=
0
;
if
((
result
=
tsdbGetSmaDataByKey
(
&
tReadH
.
dFile
,
smaKey
,
SMA_KEY_LEN
,
&
valueSize
))
==
NULL
)
{
tsdbWarn
(
"vgId:%d get sma data failed from smaIndex %"
PRIi64
", smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
tstrerror
(
terrno
));
tsdbCloseDBF
(
&
tReadH
.
dFile
);
return
TSDB_CODE_FAILED
;
}
tfree
(
result
);
#ifdef SMA_PRINT_DEBUG_LOG
for
(
uint32_t
v
=
0
;
v
<
valueSize
;
v
+=
8
)
{
tsdbWarn
(
"vgId:%d v[%d]=%"
PRIi64
,
REPO_ID
(
pTsdb
),
v
,
*
(
int64_t
*
)
POINTER_SHIFT
(
result
,
v
));
}
#endif
#if 0
int32_t nResult = 0;
int64_t lastKey = 0;
...
...
@@ -634,8 +876,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *
}
}
}
#endif
// read data from file and fill the result
tsdbCloseDBF
(
&
tReadH
.
dFile
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -673,4 +916,55 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
// }
return TSDB_CODE_SUCCESS;
}
#endif
\ No newline at end of file
#endif
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
int8_t
smaType
,
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbUpdateExpiredWindow
(
pTsdb
,
smaType
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d update expired sma window failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertRSmaDataImpl
(
pTsdb
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert rSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySkey
,
int32_t
nMaxResult
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbGetTSmaDataImpl
(
pTsdb
,
pData
,
indexUid
,
interval
,
intervalUnit
,
tableUid
,
colId
,
querySkey
,
nMaxResult
))
<
0
)
{
tsdbWarn
(
"vgId:%d get tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
f5758fbe
...
...
@@ -34,6 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
return
tsdbMemTableInsert
(
pTsdb
,
pTsdb
->
mem
,
pMsg
,
NULL
);
}
#if 0
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
...
...
@@ -51,6 +52,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
return code;
}
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
...
...
@@ -65,4 +74,6 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
\ No newline at end of file
}
#endif
\ No newline at end of file
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
f5758fbe
...
...
@@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return
RUN_ALL_TESTS
();
}
TEST
(
testCase
,
tSma
EncodeDecode
Test
)
{
TEST
(
testCase
,
tSma
_Meta_Encode_Decode_
Test
)
{
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
...
...
@@ -87,8 +87,9 @@ TEST(testCase, tSmaEncodeDecodeTest) {
tdDestroyTSma
(
&
tSma
);
tdDestroyTSmaWrapper
(
&
dstTSmaWrapper
);
}
#if 1
TEST
(
testCase
,
tSma_DB_Put_Get_Del_Test
)
{
TEST
(
testCase
,
tSma_
meta
DB_Put_Get_Del_Test
)
{
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
smaIndexName2
=
"sma_index_test_2"
;
const
char
*
timezone
=
"Asia/Shanghai"
;
...
...
@@ -220,13 +221,84 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
#endif
#if 1
TEST
(
testCase
,
tSmaInsertTest
)
{
const
int64_t
indexUid
=
2000000002
;
TEST
(
testCase
,
tSma_Data_Insert_Query_Test
)
{
// step 1: prepare meta
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
const
char
*
timezone
=
"Asia/Shanghai"
;
const
char
*
expr
=
"select count(a,b, top 20), from table interval 1d, sliding 1h;"
;
const
char
*
tagsFilter
=
"where tags.location='Beijing' and tags.district='ChaoYang'"
;
const
char
*
smaTestDir
=
"./smaTest"
;
const
tb_uid_t
tbUid
=
1234567890
;
const
int64_t
indexUid1
=
2000000001
;
const
int64_t
interval1
=
1
;
const
int8_t
intervalUnit1
=
TD_TIME_UNIT_DAY
;
const
uint32_t
nCntTSma
=
2
;
TSKEY
skey1
=
1646987196
;
const
int64_t
testSmaData1
=
100
;
const
int64_t
testSmaData2
=
200
;
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
tSma
.
intervalUnit
=
TD_TIME_UNIT_DAY
;
tSma
.
interval
=
1
;
tSma
.
slidingUnit
=
TD_TIME_UNIT_HOUR
;
tSma
.
sliding
=
0
;
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
tstrncpy
(
tSma
.
timezone
,
timezone
,
TD_TIMEZONE_LEN
);
tSma
.
tableUid
=
tbUid
;
tSma
.
exprLen
=
strlen
(
expr
);
tSma
.
expr
=
(
char
*
)
calloc
(
tSma
.
exprLen
+
1
,
1
);
tstrncpy
(
tSma
.
expr
,
expr
,
tSma
.
exprLen
+
1
);
tSma
.
tagsFilterLen
=
strlen
(
tagsFilter
);
tSma
.
tagsFilter
=
(
char
*
)
calloc
(
tSma
.
tagsFilterLen
+
1
,
1
);
tstrncpy
(
tSma
.
tagsFilter
,
tagsFilter
,
tSma
.
tagsFilterLen
+
1
);
SMeta
*
pMeta
=
NULL
;
STSma
*
pSmaCfg
=
&
tSma
;
const
SMetaCfg
*
pMetaCfg
=
&
defaultMetaOptions
;
taosRemoveDir
(
smaTestDir
);
pMeta
=
metaOpen
(
smaTestDir
,
pMetaCfg
,
NULL
);
assert
(
pMeta
!=
NULL
);
// save index 1
EXPECT_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
// step 2: insert data
STSmaDataWrapper
*
pSmaData
=
NULL
;
STsdb
tsdb
=
{
0
};
STsdbCfg
*
pCfg
=
&
tsdb
.
config
;
pCfg
->
daysPerFile
=
1
;
tsdb
.
pMeta
=
pMeta
;
tsdb
.
vgId
=
2
;
tsdb
.
config
.
daysPerFile
=
10
;
// default days is 10
tsdb
.
config
.
keep1
=
30
;
tsdb
.
config
.
keep2
=
90
;
tsdb
.
config
.
keep
=
365
;
tsdb
.
config
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
tsdb
.
config
.
update
=
TD_ROW_OVERWRITE_UPDATE
;
tsdb
.
config
.
compression
=
TWO_STAGE_COMP
;
switch
(
tsdb
.
config
.
precision
)
{
case
TSDB_TIME_PRECISION_MILLI
:
skey1
*=
1e3
;
break
;
case
TSDB_TIME_PRECISION_MICRO
:
skey1
*=
1e6
;
break
;
case
TSDB_TIME_PRECISION_NANO
:
skey1
*=
1e9
;
break
;
default:
// ms
skey1
*=
1e3
;
break
;
}
char
*
msg
=
(
char
*
)
calloc
(
100
,
1
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
&
tsdb
,
TSDB_SMA_TYPE_TIME_RANGE
,
msg
),
0
);
// init
int32_t
allocCnt
=
0
;
...
...
@@ -235,21 +307,21 @@ TEST(testCase, tSmaInsertTest) {
void
*
buf
=
NULL
;
EXPECT_EQ
(
tsdbMakeRoom
(
&
buf
,
allocStep
),
0
);
int32_t
bufSize
=
taosTSizeof
(
buf
);
int32_t
numOfTables
=
25
;
int32_t
numOfTables
=
10
;
col_id_t
numOfCols
=
4096
;
EXPECT_GT
(
numOfCols
,
0
);
pSmaData
=
(
STSmaDataWrapper
*
)
buf
;
printf
(
">> allocate [%d] time to %d and addr is %p
\n
"
,
++
allocCnt
,
bufSize
,
pSmaData
);
pSmaData
->
skey
=
1646987196
;
pSmaData
->
interval
=
10
;
pSmaData
->
intervalUnit
=
TD_TIME_UNIT_MINUTE
;
pSmaData
->
indexUid
=
indexUid
;
pSmaData
->
skey
=
skey1
;
pSmaData
->
interval
=
interval1
;
pSmaData
->
intervalUnit
=
intervalUnit1
;
pSmaData
->
indexUid
=
indexUid
1
;
int32_t
len
=
sizeof
(
STSmaDataWrapper
);
for
(
int32_t
t
=
0
;
t
<
numOfTables
;
++
t
)
{
STSmaTbData
*
pTbData
=
(
STSmaTbData
*
)
POINTER_SHIFT
(
pSmaData
,
len
);
pTbData
->
tableUid
=
t
;
pTbData
->
tableUid
=
t
bUid
+
t
;
int32_t
tableDataLen
=
sizeof
(
STSmaTbData
);
for
(
col_id_t
c
=
0
;
c
<
numOfCols
;
++
c
)
{
...
...
@@ -262,8 +334,17 @@ TEST(testCase, tSmaInsertTest) {
}
STSmaColData
*
pColData
=
(
STSmaColData
*
)
POINTER_SHIFT
(
pSmaData
,
len
+
tableDataLen
);
pColData
->
colId
=
c
+
PRIMARYKEY_TIMESTAMP_COL_ID
;
pColData
->
blockSize
=
((
c
&
1
)
==
0
)
?
8
:
16
;
// TODO: fill col data
if
((
c
&
1
)
==
0
)
{
pColData
->
blockSize
=
8
;
memcpy
(
pColData
->
data
,
&
testSmaData1
,
8
);
}
else
{
pColData
->
blockSize
=
16
;
memcpy
(
pColData
->
data
,
&
testSmaData1
,
8
);
memcpy
(
POINTER_SHIFT
(
pColData
->
data
,
8
),
&
testSmaData2
,
8
);
}
tableDataLen
+=
(
sizeof
(
STSmaColData
)
+
pColData
->
blockSize
);
}
pTbData
->
dataLen
=
(
tableDataLen
-
sizeof
(
STSmaTbData
));
...
...
@@ -277,8 +358,24 @@ TEST(testCase, tSmaInsertTest) {
// execute
EXPECT_EQ
(
tsdbInsertTSmaData
(
&
tsdb
,
(
char
*
)
pSmaData
),
TSDB_CODE_SUCCESS
);
// release
// step 3: query
uint32_t
checkDataCnt
=
0
;
for
(
int32_t
t
=
0
;
t
<
numOfTables
;
++
t
)
{
for
(
col_id_t
c
=
0
;
c
<
numOfCols
;
++
c
)
{
EXPECT_EQ
(
tsdbGetTSmaData
(
&
tsdb
,
NULL
,
indexUid1
,
interval1
,
intervalUnit1
,
tbUid
+
t
,
c
+
PRIMARYKEY_TIMESTAMP_COL_ID
,
skey1
,
1
),
TSDB_CODE_SUCCESS
);
++
checkDataCnt
;
}
}
printf
(
"%s:%d The sma data check count for insert and query is %"
PRIu32
"
\n
"
,
__FILE__
,
__LINE__
,
checkDataCnt
);
// release data
taosTZfree
(
buf
);
// release meta
tdDestroyTSma
(
&
tSma
);
metaClose
(
pMeta
);
}
#endif
...
...
source/util/src/terror.c
浏览文件 @
f5758fbe
...
...
@@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_MESSED_MSG
,
"TSDB messed message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
,
"No sma index in meta"
)
// query
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_QHANDLE
,
"Invalid handle"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录