Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8627b593
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8627b593
编写于
3月 30, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
insert tSma by SSDataBlock
上级
4bad58c1
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
309 addition
and
208 deletion
+309
-208
include/common/tmsg.h
include/common/tmsg.h
+0
-27
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+3
-7
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+2
-3
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+190
-143
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+3
-1
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+111
-27
未找到文件。
include/common/tmsg.h
浏览文件 @
8627b593
...
...
@@ -2014,7 +2014,6 @@ typedef struct {
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
char
indexName
[
TSDB_INDEX_NAME_LEN
];
char
timezone
[
TD_TIMEZONE_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
...
...
@@ -2052,32 +2051,6 @@ void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
int32_t
tSerializeSVDropTSmaReq
(
void
**
buf
,
SVDropTSmaReq
*
pReq
);
void
*
tDeserializeSVDropTSmaReq
(
void
*
buf
,
SVDropTSmaReq
*
pReq
);
typedef
struct
{
col_id_t
colId
;
uint16_t
blockSize
;
// sma data block size
char
data
[];
}
STSmaColData
;
typedef
struct
{
tb_uid_t
tableUid
;
// super/child/normal table uid
int32_t
dataLen
;
// not including head
char
data
[];
}
STSmaTbData
;
typedef
struct
{
int64_t
indexUid
;
TSKEY
skey
;
// startKey of one interval/sliding window
int64_t
interval
;
int32_t
dataLen
;
// not including head
int8_t
intervalUnit
;
char
data
[];
}
STSmaDataWrapper
;
// sma data for a interval/sliding window
// interval/sliding => window
// => window->table->colId
// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记
// RSma: Rollup SMA
typedef
struct
{
int64_t
interval
;
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
8627b593
...
...
@@ -100,10 +100,11 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg);
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param indexUid
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
/**
* @brief Drop tSma data and local cache.
...
...
@@ -130,16 +131,11 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
* @param pTsdb
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
8627b593
...
...
@@ -44,11 +44,10 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
tb_uid_t
tableUid
,
col_id_t
col
Id
,
TSKEY
tsKey
,
void
**
pData
)
{
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
int64_t
group
Id
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI64
(
pData
,
tableUid
);
len
+=
taosEncodeFixedU16
(
pData
,
colId
);
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
len
+=
taosEncodeFixedI64
(
pData
,
groupId
);
return
len
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
8627b593
...
...
@@ -24,7 +24,7 @@ static const char *TSDB_SMA_DNAME[] = {
#define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_TSDB_TIMES 10
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 1
8 // tableUid_colId_TSKEY 8+2
+8
#define SMA_KEY_LEN 1
6 // TSKEY+groupId 8
+8
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
#define SMA_STATE_HASH_SLOT 4
...
...
@@ -38,10 +38,10 @@ typedef enum {
}
ESmaStorageLevel
;
typedef
struct
{
STsdb
*
pTsdb
;
SDBFile
dFile
;
SSDataBlock
*
pData
;
// sma data
int32_t
interval
;
// interval with the precision of DB
STsdb
*
pTsdb
;
SDBFile
dFile
;
const
SArray
*
pDataBlocks
;
// sma data
int32_t
interval
;
// interval with the precision of DB
}
STSmaWriteH
;
typedef
struct
{
...
...
@@ -94,26 +94,24 @@ static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
// read data
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
// insert data
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
SSDataBlock
*
pData
,
int64_t
interval
,
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
const
SArray
*
pDataBlocks
,
int64_t
interval
,
int8_t
intervalUnit
);
static
void
tsdbDestroyTSmaWriteH
(
STSmaWriteH
*
pSmaH
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbGetSmaStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdb
InsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
);
static
int32_t
tsdb
SetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int32_t
fid
);
static
int32_t
tsdbInsertTSmaBlocks
(
STSmaWriteH
*
pSmaH
,
void
*
smaKey
,
uint32_t
keyLen
,
void
*
pData
,
uint32_t
dataLen
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
,
bool
adjusted
);
static
int32_t
tsdbGetTSmaDays
(
STsdb
*
pTsdb
,
int64_t
interval
,
int32_t
storageLevel
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int32_t
fid
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int64_t
indexUid
,
int32_t
fid
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pSmaH
,
int64_t
indexUid
,
TSKEY
skey
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
TSKEY
*
queryKey
);
static
void
tsdbGetSmaDir
(
int32_t
vgId
,
ETsdbSmaType
smaType
,
char
dirName
[]);
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
);
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
c
onst
c
har
*
msg
);
// mgmt interface
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
...
...
@@ -387,7 +385,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
...
...
@@ -480,18 +477,15 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
#endif
// Firstly, assume that tSma can only be created on super table/normal table.
// getActiveTimeWindow
SSmaEnv
*
pEnv
=
REPO_SMA_ENV
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
);
SSmaEnv
*
pEnv
=
REPO_SMA_ENV
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
pItemsHash
=
SMA_ENV_STAT_ITEMS
(
pEnv
);
TASSERT
(
pEnv
!=
NULL
&&
pStat
!=
NULL
&&
pItemsHash
!=
NULL
);
// basic procedure
// TODO: optimization
tsdbRefSmaStat
(
pTsdb
,
pStat
);
...
...
@@ -523,11 +517,11 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) {
tdFreeTSmaWrapper
(
pSW
);
break
;
}
if
(
pSW
==
NULL
)
{
if
((
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
pBlock
->
suid
))
==
NULL
)
{
if
(
pSW
==
NULL
)
{
if
((
pSW
=
metaGetSmaInfoByTable
(
REPO_META
(
pTsdb
),
pBlock
->
suid
))
==
NULL
)
{
break
;
}
if
((
pSW
->
number
)
<=
0
||
(
pSW
->
tSma
==
NULL
))
{
if
((
pSW
->
number
)
<=
0
||
(
pSW
->
tSma
==
NULL
))
{
tdFreeTSmaWrapper
(
pSW
);
break
;
}
...
...
@@ -683,9 +677,15 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
* @param interval
* @param intervalUnit
* @param precision
* @param adjusted Interval already adjusted according to DB precision
* @return int64_t
*/
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
)
{
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
,
bool
adjusted
)
{
if
(
adjusted
)
{
return
interval
;
}
switch
(
intervalUnit
)
{
case
TIME_UNIT_YEAR
:
// approximate value
interval
*=
365
*
86400
*
1e3
;
...
...
@@ -753,59 +753,12 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
return
interval
;
}
/**
* @brief Split the TSma data blocks into expected size and insert into B+Tree.
*
* @param pSmaH
* @param pData
* @param nOffset The nOffset of blocks since fid changes.
* @param nBlocks The nBlocks with the same fid since nOffset.
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
tsdbDebug
(
"tsdbInsertTSmaDataSection: index %"
PRIi64
", skey %"
PRIi64
,
pData
->
indexUid
,
pData
->
skey
);
// TODO: check the data integrity
int32_t
len
=
0
;
while
(
true
)
{
if
(
len
>=
pData
->
dataLen
)
{
break
;
}
assert
(
pData
->
dataLen
>
0
);
STSmaTbData
*
pTbData
=
(
STSmaTbData
*
)
POINTER_SHIFT
(
pData
->
data
,
len
);
int32_t
tbLen
=
0
;
while
(
true
)
{
if
(
tbLen
>=
pTbData
->
dataLen
)
{
break
;
}
assert
(
pTbData
->
dataLen
>
0
);
STSmaColData
*
pColData
=
(
STSmaColData
*
)
POINTER_SHIFT
(
pTbData
->
data
,
tbLen
);
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
#if 0
printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n",
pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId);
#endif
tsdbEncodeTSmaKey
(
pTbData
->
tableUid
,
pColData
->
colId
,
pData
->
skey
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
pSmaH
,
smaKey
,
SMA_KEY_LEN
,
pColData
->
data
,
pColData
->
blockSize
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
tbLen
+=
(
sizeof
(
STSmaColData
)
+
pColData
->
blockSize
);
}
len
+=
(
sizeof
(
STSmaTbData
)
+
pTbData
->
dataLen
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
SSDataBlock
*
pData
,
int64_t
interval
,
int8_t
intervalUnit
)
{
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
const
SArray
*
pDataBlocks
,
int64_t
interval
,
int8_t
intervalUnit
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
interval
,
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
pData
=
pData
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
interval
,
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
,
true
);
pSmaH
->
pDataBlocks
=
pDataBlocks
;
pSmaH
->
dFile
.
fid
=
TSDB_IVLD_FID
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -815,7 +768,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
}
}
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int32_t
fid
)
{
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int64_t
indexUid
,
int32_t
fid
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
ASSERT
(
pSmaH
->
dFile
.
path
==
NULL
&&
pSmaH
->
dFile
.
pDB
==
NULL
);
...
...
@@ -859,11 +812,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
* @param msg
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
SSDataBlock
*
pData
=
(
SSDataBlock
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
@@ -871,15 +823,15 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
return
terrno
;
}
if
(
pData
==
NULL
)
{
if
(
pData
Blocks
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pData is NULL"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert tSma data failed since pData
Blocks
is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
}
if
(
taosArrayGetSize
(
pData
->
pDataBlock
)
<=
0
)
{
if
(
taosArrayGetSize
(
pData
Blocks
)
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pDataBlock is empty"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert tSma data failed since pDataBlock
s
is empty"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_FAILED
;
}
...
...
@@ -899,10 +851,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
}
STSma
*
pSma
=
pItem
->
pSma
;
STSmaWriteH
tSmaH
=
{
0
};
if
(
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
,
pSma
->
interval
,
pSma
->
intervalUnit
)
!=
0
)
{
if
(
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
Blocks
,
pSma
->
interval
,
pSma
->
intervalUnit
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
...
...
@@ -921,33 +872,134 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
int32_t
storageLevel
=
tsdbGetSmaStorageLevel
(
pSma
->
interval
,
pSma
->
intervalUnit
);
int32_t
daysPerFile
=
tsdbGetTSmaDays
(
pTsdb
,
tSmaH
.
interval
,
storageLevel
);
// key: skey + groupId
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
char
dataBuf
[
512
]
=
{
0
};
void
*
pDataBuf
=
&
dataBuf
;
int32_t
sz
=
taosArrayGetSize
(
pDataBlocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
*
(
SSDataBlock
**
)
taosArrayGet
(
pDataBlocks
,
i
);
int32_t
colNum
=
pDataBlock
->
info
.
numOfCols
;
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
rowSize
=
pDataBlock
->
info
.
rowSize
;
int64_t
groupId
=
pDataBlock
->
info
.
groupId
;
for
(
int32_t
j
=
0
;
j
<
rows
;
++
j
)
{
printf
(
"|"
);
TSKEY
skey
=
TSKEY_INITIAL_VAL
;
// the start key of TS window by interval
int32_t
tlen
=
0
;
for
(
int32_t
k
=
0
;
k
<
colNum
;
++
k
)
{
SColumnInfoData
*
pColInfoData
=
*
(
SColumnInfoData
**
)
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
skey
=
*
(
TSKEY
*
)
var
;
printf
(
" skey = %"
PRIi64
" groupId = %"
PRId64
"|"
,
skey
,
groupId
);
tsdbEncodeTSmaKey
(
groupId
,
skey
,
&
pSmaKey
);
break
;
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_UTINYINT
:
printf
(
" %15d |"
,
*
(
uint8_t
*
)
var
);
tlen
+=
taosEncodeFixedU8
(
&
pDataBuf
,
*
(
uint8_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_TINYINT
:
printf
(
" %15d |"
,
*
(
int8_t
*
)
var
);
tlen
+=
taosEncodeFixedI8
(
&
pDataBuf
,
*
(
int8_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
printf
(
" %15d |"
,
*
(
int16_t
*
)
var
);
tlen
+=
taosEncodeFixedI16
(
&
pDataBuf
,
*
(
int16_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
printf
(
" %15d |"
,
*
(
uint16_t
*
)
var
);
tlen
+=
taosEncodeFixedU16
(
&
pDataBuf
,
*
(
uint16_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_INT
:
printf
(
" %15d |"
,
*
(
int32_t
*
)
var
);
tlen
+=
taosEncodeFixedI32
(
&
pDataBuf
,
*
(
int32_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
tlen
+=
taosEncodeFixedU32
(
&
pDataBuf
,
*
(
uint32_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
printf
(
" %15ld |"
,
*
(
int64_t
*
)
var
);
tlen
+=
taosEncodeFixedI64
(
&
pDataBuf
,
*
(
int64_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_UBIGINT
:
printf
(
" %15lu |"
,
*
(
uint64_t
*
)
var
);
tlen
+=
taosEncodeFixedU64
(
&
pDataBuf
,
*
(
uint64_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_NCHAR
:
{
char
tmpChar
[
100
]
=
{
0
};
strncpy
(
tmpChar
,
varDataVal
(
var
),
varDataLen
(
var
));
printf
(
" %s |"
,
tmpChar
);
tlen
+=
taosEncodeBinary
(
&
pDataBuf
,
varDataVal
(
var
),
varDataLen
(
var
));
break
;
}
case
TSDB_DATA_TYPE_VARCHAR
:
{
// TSDB_DATA_TYPE_BINARY
char
tmpChar
[
100
]
=
{
0
};
strncpy
(
tmpChar
,
varDataVal
(
var
),
varDataLen
(
var
));
printf
(
" %s |"
,
tmpChar
);
tlen
+=
taosEncodeBinary
(
&
pDataBuf
,
varDataVal
(
var
),
varDataLen
(
var
));
break
;
}
case
TSDB_DATA_TYPE_VARBINARY
:
// TODO: add binary/varbinary
TASSERT
(
0
);
default:
printf
(
"the column type %"
PRIi16
" is undefined
\n
"
,
pColInfoData
->
info
.
type
);
TASSERT
(
0
);
break
;
}
}
if
((
tlen
>
0
)
&&
(
skey
!=
TSKEY_INITIAL_VAL
))
{
int32_t
fid
=
(
int32_t
)(
TSDB_KEY_FID
(
skey
,
daysPerFile
,
pCfg
->
precision
));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
// file
// - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit();
if
(
fid
!=
tSmaH
.
dFile
.
fid
)
{
if
(
tSmaH
.
dFile
.
fid
!=
TSDB_IVLD_FID
)
{
tsdbCloseDBF
(
&
tSmaH
.
dFile
);
}
tsdbSetTSmaDataFile
(
&
tSmaH
,
indexUid
,
fid
);
if
(
tsdbOpenDBF
(
pTsdb
->
pTSmaEnv
->
dbEnv
,
&
tSmaH
.
dFile
)
!=
0
)
{
tsdbWarn
(
"vgId:%d open DB file %s failed since %s"
,
REPO_ID
(
pTsdb
),
tSmaH
.
dFile
.
path
?
tSmaH
.
dFile
.
path
:
"path is NULL"
,
tstrerror
(
terrno
));
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
}
}
#if 0
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
if
(
tsdbInsertTSmaBlocks
(
&
tSmaH
,
pSmaKey
,
SMA_KEY_LEN
,
pDataBuf
,
tlen
)
!=
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data blocks failed for index %"
PRIi64
", skey %"
PRIi64
", groupId %"
PRIi64
" since %s"
,
REPO_ID
(
pTsdb
),
indexUid
,
skey
,
groupId
,
tstrerror
(
terrno
));
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_FAILED
;
}
else
{
tsdbWarn
(
"vgId:%d insert tSma data blocks success for index %"
PRIi64
", skey %"
PRIi64
", groupId %"
PRIi64
,
REPO_ID
(
pTsdb
),
indexUid
,
skey
,
groupId
);
}
// TODO:tsdbEndTSmaCommit();
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
}
// Step 3: reset the SSmaStat
tsdbResetExpiredWindow
(
pTsdb
,
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
),
indexUid
,
skey
);
}
else
{
tsdbWarn
(
"vgId:%d invalid data skey:%"
PRIi64
", tlen %"
PRIi32
" during insert tSma data for %"
PRIi64
,
REPO_ID
(
pTsdb
),
skey
,
tlen
,
indexUid
);
}
if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) {
tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH);
tsdbUnRefSmaStat(pTsdb, pStat);
return TSDB_CODE_FAILED;
printf
(
"
\n
"
);
}
}
// TODO:tsdbEndTSmaCommit();
// Step 3: reset the SSmaStat
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
#endif
tsdbDestroyTSmaWriteH
(
&
tSmaH
);
tsdbUnRefSmaStat
(
pTsdb
,
pStat
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1002,7 +1054,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
// TODO:
}
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSmaDataWrapper
*
pData
,
int32_t
fid
)
{
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
int32_t
fid
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
char
tSmaFile
[
TSDB_FILENAME_LEN
]
=
{
0
};
...
...
@@ -1012,11 +1064,11 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
SSDataBlock
*
pData
=
(
SSDataBlock
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pRSmaEnv
);
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
c
onst
c
har
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pRSmaEnv
);
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
...
...
@@ -1030,15 +1082,15 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
return
terrno
;
}
if
(
pData
==
NULL
)
{
if
(
pData
Blocks
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert rSma data failed since pData is NULL"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert rSma data failed since pData
Blocks
is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
}
if
(
taosArrayGetSize
(
pData
->
pDataBlock
)
<=
0
)
{
if
(
taosArrayGetSize
(
pData
Blocks
)
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
tsdbWarn
(
"vgId:%d insert rSma data failed since pDataBlock is empty"
,
REPO_ID
(
pTsdb
));
tsdbWarn
(
"vgId:%d insert rSma data failed since pDataBlock
s
is empty"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_FAILED
;
}
...
...
@@ -1061,12 +1113,12 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STSmaWriteH
tSmaH
=
{
0
};
if
(
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
,
pSma
->
interval
,
pSma
->
intervalUnit
)
!=
0
)
{
if
(
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
pData
Blocks
,
pSma
->
interval
,
pSma
->
intervalUnit
)
!=
0
)
{
return
TSDB_CODE_FAILED
;
}
char
rPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
aPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
rPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
aPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
rPath
,
TSDB_FILENAME_LEN
,
"%s%s%"
PRIi64
,
SMA_ENV_PATH
(
pEnv
),
TD_DIRSEP
,
indexUid
);
tfsAbsoluteName
(
REPO_TFS
(
pTsdb
),
SMA_ENV_DID
(
pEnv
),
rPath
,
aPath
);
if
(
!
taosCheckExistFile
(
aPath
))
{
...
...
@@ -1078,7 +1130,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
// Step 1: Judge the storage level and days
int32_t
storageLevel
=
tsdbGetSmaStorageLevel
(
pSma
->
interval
,
pSma
->
intervalUnit
);
int32_t
daysPerFile
=
tsdbGetTSmaDays
(
pTsdb
,
tSmaH
.
interval
,
storageLevel
);
#if 0
#if 0
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
...
...
@@ -1119,7 +1171,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
*/
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
int64_t
interval
,
int8_t
intervalUnit
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
interval
,
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
interval
,
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
,
true
);
pSmaH
->
storageLevel
=
tsdbGetSmaStorageLevel
(
interval
,
intervalUnit
);
pSmaH
->
days
=
tsdbGetTSmaDays
(
pTsdb
,
pSmaH
->
interval
,
pSmaH
->
storageLevel
);
}
...
...
@@ -1185,17 +1237,11 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
* @param pTsdb Return the data between queryWin and fill the pData.
* @param pData
* @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param pQuerySKey
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t
*/
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
static
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
pTsdb
->
pTSmaEnv
);
if
(
!
pEnv
)
{
...
...
@@ -1243,13 +1289,18 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
tsdbDebug
(
"vgId:%d skey %"
PRIi64
" of window not in expired window for index %"
PRIi64
,
REPO_ID
(
pTsdb
),
querySKey
,
indexUid
);
}
tsdbUnRefSmaStat
(
pTsdb
,
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
));
STSma
*
pTSma
=
pItem
->
pSma
;
#endif
STSmaReadH
tReadH
=
{
0
};
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
interval
,
intervalUnit
);
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
pTSma
->
interval
,
pTSma
->
intervalUnit
);
tsdbCloseDBF
(
&
tReadH
.
dFile
);
tsdbUnRefSmaStat
(
pTsdb
,
SMA_ENV_STAT
(
pTsdb
->
pTSmaEnv
));
tsdbInitTSmaFile
(
&
tReadH
,
indexUid
,
querySKey
);
if
(
tsdbOpenDBF
(
SMA_ENV_ENV
(
pTsdb
->
pTSmaEnv
),
&
tReadH
.
dFile
)
!=
0
)
{
...
...
@@ -1259,7 +1310,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
tableUid
,
colId
,
querySKey
,
(
void
**
)
&
pSmaKey
);
int64_t
queryGroupId
=
1
;
tsdbEncodeTSmaKey
(
queryGroupId
,
querySKey
,
(
void
**
)
&
pSmaKey
);
tsdbDebug
(
"vgId:%d get sma data from %s: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", keyLen %d"
,
REPO_ID
(
pTsdb
),
tReadH
.
dFile
.
path
,
*
(
tb_uid_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
...
...
@@ -1347,11 +1399,10 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
}
#endif
// TODO: Who is responsible for resource allocate and release?
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
char
*
msg
)
{
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
msg
))
<
0
)
{
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
indexUid
,
msg
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
...
...
@@ -1373,18 +1424,14 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
return
code
;
}
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
STSmaDataWrapper
*
pData
,
int64_t
indexUid
,
int64_t
interval
,
int8_t
intervalUnit
,
tb_uid_t
tableUid
,
col_id_t
colId
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbGetTSmaDataImpl
(
pTsdb
,
pData
,
indexUid
,
interval
,
intervalUnit
,
tableUid
,
colId
,
querySKey
,
nMaxResult
))
<
0
)
{
if
((
code
=
tsdbGetTSmaDataImpl
(
pTsdb
,
pData
,
indexUid
,
querySKey
,
nMaxResult
))
<
0
)
{
tsdbWarn
(
"vgId:%d get tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbDropTSmaDataImpl
(
pTsdb
,
indexUid
))
<
0
)
{
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
8627b593
...
...
@@ -17,7 +17,9 @@
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
blockDebugShowData
(
data
);
tsdbInsertTSmaData
(((
SVnode
*
)
pVnode
)
->
pTsdb
,
smaId
,
(
const
char
*
)
data
);
}
void
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
...
...
@@ -201,7 +203,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vCreateSmaReq
.
tSma
.
indexUid
);
// record current timezone of server side
tstrncpy
(
vCreateSmaReq
.
tSma
.
timezone
,
tsTimezoneStr
,
TD_TIMEZONE_LEN
)
;
vCreateSmaReq
.
tSma
.
timezoneInt
=
tsTimezone
;
if
(
metaCreateTSma
(
pVnode
->
pMeta
,
&
vCreateSmaReq
)
<
0
)
{
// TODO: handle error
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
8627b593
...
...
@@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include <tsdbDef.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
...
...
@@ -280,7 +281,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
}
#endif
#if
0
#if
1
TEST
(
testCase
,
tSma_Data_Insert_Query_Test
)
{
// step 1: prepare meta
const
char
*
smaIndexName1
=
"sma_index_test_1"
;
...
...
@@ -299,9 +300,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
// encode
STSma
tSma
=
{
0
};
tSma
.
version
=
0
;
tSma.intervalUnit = TIME_UNIT_
DAY
;
tSma
.
intervalUnit
=
TIME_UNIT_
MINUTE
;
tSma
.
interval
=
1
;
tSma.slidingUnit = TIME_UNIT_
HOUR
;
tSma
.
slidingUnit
=
TIME_UNIT_
MINUTE
;
tSma
.
sliding
=
1
;
// sliding = interval when it's convert window
tSma
.
indexUid
=
indexUid1
;
tstrncpy
(
tSma
.
indexName
,
smaIndexName1
,
TSDB_INDEX_NAME_LEN
);
...
...
@@ -330,8 +331,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_EQ
(
metaSaveSmaToDB
(
pMeta
,
pSmaCfg
),
0
);
// step 2: insert data
STsdb
*pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
STsdbCfg
*pCfg = &pTsdb->config;
STsdb
*
pTsdb
=
(
STsdb
*
)
taosMemoryCalloc
(
1
,
sizeof
(
STsdb
));
STsdbCfg
*
pCfg
=
&
pTsdb
->
config
;
pTsdb
->
pMeta
=
pMeta
;
pTsdb
->
vgId
=
2
;
...
...
@@ -405,15 +406,94 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
(
const
char
*
)
pMsg
),
0
);
// init
int32_t allocCnt = 0;
int32_t allocStep = 16384;
int32_t buffer = 1024;
void *buf = NULL;
ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t bufSize = taosTSizeof(buf);
int32_t numOfTables = 10;
col_id_t numOfCols = 4096;
ASSERT_GT(numOfCols, 0);
const
int32_t
tSmaGroupSize
=
4
;
const
int32_t
tSmaNumOfTags
=
2
;
const
int64_t
tSmaGroupId
=
12345670
;
const
col_id_t
tSmaNumOfCols
=
9
;
// binary/nchar/varbinary/varchar are only used for tags for group by conditions.
const
int32_t
tSmaNumOfRows
=
2
;
SArray
*
pDataBlocks
=
taosArrayInit
(
tSmaGroupSize
,
sizeof
(
SSDataBlock
*
));
ASSERT_NE
(
pDataBlocks
,
nullptr
);
int32_t
tSmaTypeArray
[
tSmaNumOfCols
]
=
{
TSDB_DATA_TYPE_TIMESTAMP
,
TSDB_DATA_TYPE_BOOL
,
TSDB_DATA_TYPE_INT
,
TSDB_DATA_TYPE_UBIGINT
,
TSDB_DATA_TYPE_SMALLINT
,
TSDB_DATA_TYPE_FLOAT
,
TSDB_DATA_TYPE_DOUBLE
,
TSDB_DATA_TYPE_VARCHAR
,
TSDB_DATA_TYPE_NCHAR
};
// last 2 columns for group by tags
// int32_t tSmaTypeArray[tSmaNumOfCols] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL};
const
char
*
tSmaGroupbyTags
[
tSmaGroupSize
*
tSmaNumOfTags
]
=
{
"BeiJing"
,
"HaiDian"
,
"BeiJing"
,
"ChaoYang"
,
"ShangHai"
,
"PuDong"
,
"ShangHai"
,
"MinHang"
};
TSKEY
tSmaSKeyMs
=
(
int64_t
)
1648535332
*
1000
;
int64_t
tSmaIntervalMs
=
tSma
.
interval
*
60
*
1000
;
int64_t
tSmaInitVal
=
0
;
for
(
int32_t
g
=
0
;
g
<
tSmaGroupSize
;
++
g
)
{
SSDataBlock
*
pDataBlock
=
(
SSDataBlock
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
ASSERT_NE
(
pDataBlock
,
nullptr
);
pDataBlock
->
pBlockAgg
=
NULL
;
pDataBlock
->
info
.
numOfCols
=
tSmaNumOfCols
;
pDataBlock
->
info
.
rows
=
tSmaNumOfRows
;
pDataBlock
->
info
.
groupId
=
tSmaGroupId
+
g
;
pDataBlock
->
pDataBlock
=
taosArrayInit
(
tSmaNumOfCols
,
sizeof
(
SColumnInfoData
*
));
ASSERT_NE
(
pDataBlock
->
pDataBlock
,
nullptr
);
for
(
int32_t
c
=
0
;
c
<
tSmaNumOfCols
;
++
c
)
{
SColumnInfoData
*
pColInfoData
=
(
SColumnInfoData
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
));
ASSERT_NE
(
pColInfoData
,
nullptr
);
pColInfoData
->
info
.
type
=
tSmaTypeArray
[
c
];
if
(
IS_VAR_DATA_TYPE
(
pColInfoData
->
info
.
type
))
{
pColInfoData
->
info
.
bytes
=
100
;
// update accordingly
}
else
{
pColInfoData
->
info
.
bytes
=
TYPE_BYTES
[
pColInfoData
->
info
.
type
];
}
pColInfoData
->
pData
=
(
char
*
)
taosMemoryCalloc
(
1
,
tSmaNumOfRows
*
pColInfoData
->
info
.
bytes
);
for
(
int32_t
r
=
0
;
r
<
tSmaNumOfRows
;
++
r
)
{
void
*
pCellData
=
pColInfoData
->
pData
+
r
*
pColInfoData
->
info
.
bytes
;
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
*
(
TSKEY
*
)
pCellData
=
tSmaSKeyMs
+
tSmaIntervalMs
*
r
;
break
;
case
TSDB_DATA_TYPE_BOOL
:
*
(
bool
*
)
pCellData
=
(
bool
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_INT
:
*
(
int
*
)
pCellData
=
(
int
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
*
(
uint64_t
*
)
pCellData
=
(
uint64_t
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
*
(
int16_t
*
)
pCellData
=
(
int16_t
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
*
(
float
*
)
pCellData
=
(
float
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
*
(
double
*
)
pCellData
=
(
double
)
tSmaInitVal
++
;
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
// city
varDataSetLen
(
pCellData
,
strlen
(
tSmaGroupbyTags
[
g
*
2
]));
memcpy
(
varDataVal
(
pCellData
),
tSmaGroupbyTags
[
g
*
2
],
varDataLen
(
pCellData
));
break
;
case
TSDB_DATA_TYPE_NCHAR
:
// district
varDataSetLen
(
pCellData
,
strlen
(
tSmaGroupbyTags
[
g
*
2
+
1
]));
memcpy
(
varDataVal
(
pCellData
),
tSmaGroupbyTags
[
g
*
2
+
1
],
varDataLen
(
pCellData
));
break
;
default:
ASSERT_EQ
(
0
,
1
);
// add definition
break
;
}
}
// push SColumnInfoData
taosArrayPush
(
pDataBlock
->
pDataBlock
,
&
pColInfoData
);
}
// push SSDataBlock
taosArrayPush
(
pDataBlocks
,
&
pDataBlock
);
}
// execute
ASSERT_EQ
(
tsdbInsertTSmaData
(
pTsdb
,
tSma
.
indexUid
,
(
const
char
*
)
pDataBlocks
),
TSDB_CODE_SUCCESS
);
#if 0
STSmaDataWrapper *pSmaData = NULL;
...
...
@@ -464,26 +544,30 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
#endif
SSDataBlock *pSmaData = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
// step 3: query
uint32_t
checkDataCnt
=
0
;
for (int32_t t = 0; t < numOfTables; ++t) {
for (col_id_t c = 0; c < numOfCols; ++c) {
ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
TSDB_CODE_SUCCESS);
++checkDataCnt;
}
}
ASSERT_EQ
(
tsdbGetTSmaData
(
pTsdb
,
NULL
,
indexUid1
,
skey1
,
1
),
TSDB_CODE_SUCCESS
);
++
checkDataCnt
;
printf
(
"%s:%d The sma data check count for insert and query is %"
PRIu32
"
\n
"
,
__FILE__
,
__LINE__
,
checkDataCnt
);
// release data
taosMemoryFreeClear
(
pMsg
);
taosTZfree(buf);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pDataBlocks
);
++
i
)
{
SSDataBlock
*
pDataBlock
=
(
SSDataBlock
*
)
taosArrayGet
(
pDataBlocks
,
i
);
int32_t
numOfOutput
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
for
(
int32_t
j
=
0
;
j
<
numOfOutput
;
++
j
)
{
SColumnInfoData
*
pColInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pDataBlock
->
pDataBlock
,
j
);
colDataDestroy
(
pColInfoData
);
}
taosArrayDestroy
(
pDataBlock
->
pDataBlock
);
taosMemoryFreeClear
(
pDataBlock
->
pBlockAgg
);
taosMemoryFreeClear
(
pDataBlock
);
}
taosArrayDestroy
(
pDataBlocks
);
// release meta
tdDestroyTSma
(
&
tSma
);
tfsClose
(
pTsdb
->
pTfs
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录