Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d4123972
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d4123972
编写于
3月 06, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add/check tSma schema methods
上级
e861fd6f
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
2482 addition
and
23 deletion
+2482
-23
include/common/tmsg.h
include/common/tmsg.h
+61
-7
include/common/tmsgdef.h
include/common/tmsgdef.h
+3
-0
include/common/trow.h
include/common/trow.h
+3
-3
source/dnode/vnode/inc/meta.h
source/dnode/vnode/inc/meta.h
+1
-0
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+21
-0
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+1
-0
source/dnode/vnode/src/inc/tsdbFS.h
source/dnode/vnode/src/inc/tsdbFS.h
+1
-0
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+95
-0
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+52
-8
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+2180
-0
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+41
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+9
-0
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+14
-5
未找到文件。
include/common/tmsg.h
浏览文件 @
d4123972
...
...
@@ -1866,10 +1866,59 @@ typedef struct {
uint64_t
tableUid
;
// super/common table uid
int64_t
interval
;
int64_t
sliding
;
col_id_t
*
colIds
;
//
N.B.
sorted column ids
uint16_t
*
funcIds
;
//
N.B.
sorted sma function ids
col_id_t
*
colIds
;
// sorted column ids
uint16_t
*
funcIds
;
// sorted sma function ids
}
STSma
;
// Time-range-wise SMA
typedef
struct
{
int8_t
msgType
;
// 0 create, 1 recreate
STSma
tSma
;
STimeWindow
window
;
}
SCreateTSmaMsg
;
typedef
struct
{
STimeWindow
window
;
char
indexName
[
TSDB_INDEX_NAME_LEN
+
1
];
}
SDropTSmaMsg
;
typedef
struct
{
STimeWindow
tsWindow
;
// [skey, ekey]
uint64_t
tableUid
;
// sub/common table uid
int32_t
numOfBlocks
;
// number of sma blocks for each column, total number is numOfBlocks*numOfColId
int32_t
dataLen
;
// total data length
col_id_t
*
colIds
;
// e.g. 2,4,9,10
col_id_t
numOfColIds
;
// e.g. 4
char
data
[];
// the sma blocks
}
STSmaData
;
// TODO: move to the final location afte schema of STSma/STSmaData defined
static
FORCE_INLINE
void
tdDestroySmaData
(
STSmaData
*
pSmaData
)
{
if
(
pSmaData
)
{
if
(
pSmaData
->
colIds
)
{
tfree
(
pSmaData
->
colIds
);
}
tfree
(
pSmaData
);
}
}
// RSma: Time-range-wise Rollup SMA
// TODO: refactor when rSma grammar defined finally =>
typedef
struct
{
int64_t
interval
;
int32_t
retention
;
// unit: day
uint16_t
days
;
// unit: day
int8_t
intervalUnit
;
}
SSmaParams
;
// TODO: refactor when rSma grammar defined finally <=
typedef
struct
{
// TODO: refactor to use the real schema =>
STSma
tsma
;
float
xFilesFactor
;
SArray
*
smaParams
;
// SSmaParams
// TODO: refactor to use the real schema <=
}
SRSma
;
typedef
struct
{
uint32_t
number
;
STSma
*
tSma
;
...
...
@@ -1885,12 +1934,17 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
}
}
static
FORCE_INLINE
void
tdDestroyTSmaWrapper
(
STSmaWrapper
*
pSW
)
{
if
(
pSW
&&
pSW
->
tSma
)
{
for
(
uint32_t
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
tdDestroyTSma
(
pSW
->
tSma
+
i
,
false
);
static
FORCE_INLINE
void
tdDestroyTSmaWrapper
(
STSmaWrapper
*
pSW
,
bool
releaseSelf
)
{
if
(
pSW
)
{
if
(
pSW
->
tSma
)
{
for
(
uint32_t
i
=
0
;
i
<
pSW
->
number
;
++
i
)
{
tdDestroyTSma
(
pSW
->
tSma
+
i
,
false
);
}
tfree
(
pSW
->
tSma
);
}
if
(
releaseSelf
)
{
free
(
pSW
);
}
tfree
(
pSW
->
tSma
);
}
}
...
...
include/common/tmsgdef.h
浏览文件 @
d4123972
...
...
@@ -184,6 +184,9 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBSCRIBE
,
"vnode-subscribe"
,
SMVSubscribeReq
,
SMVSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONSUME
,
"vnode-consume"
,
SMqCVConsumeReq
,
SMqCVConsumeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CREATE_SMA
,
"vnode-create-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
// Requests handled by QNODE
TD_NEW_MSG_SEG
(
TDMT_QND_MSG
)
...
...
include/common/trow.h
浏览文件 @
d4123972
...
...
@@ -118,6 +118,8 @@ typedef struct {
}
SKvRow
;
typedef
struct
{
/// timestamp
TSKEY
ts
;
union
{
/// union field for encode and decode
uint32_t
info
;
...
...
@@ -138,8 +140,6 @@ typedef struct {
uint32_t
len
;
/// row version
uint64_t
ver
;
/// timestamp
TSKEY
ts
;
/// the inline data, maybe a tuple or a k-v tuple
char
data
[];
}
STSRow
;
...
...
@@ -173,7 +173,7 @@ typedef struct {
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_KEY_ADDR(r)
POINTER_SHIFT((r), 16
)
#define TD_ROW_KEY_ADDR(r)
(r
)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
...
...
source/dnode/vnode/inc/meta.h
浏览文件 @
d4123972
...
...
@@ -57,6 +57,7 @@ STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
SSmaCfg
*
metaGetSmaInfoByName
(
SMeta
*
pMeta
,
const
char
*
indexName
);
STSmaWrapper
*
metaGetSmaInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
d4123972
...
...
@@ -87,6 +87,27 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
/**
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
...
...
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
d4123972
...
...
@@ -35,6 +35,7 @@
#include "tsdbMemory.h"
#include "tsdbOptions.h"
#include "tsdbReadImpl.h"
#include "tsdbSma.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/dnode/vnode/src/inc/tsdbFS.h
浏览文件 @
d4123972
...
...
@@ -42,6 +42,7 @@ typedef struct {
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
smaf
;
// sma data file array
}
SFSStatus
;
typedef
struct
{
...
...
source/dnode/vnode/src/inc/tsdbSma.h
0 → 100644
浏览文件 @
d4123972
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
// insert/update interface
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
);
// query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
);
// management interface
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
);
int32_t
tsdbRemoveTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STimeWindow
*
pWin
);
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
uint64_t
tableUid
,
col_id_t
colId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedU64
(
pData
,
tableUid
);
len
+=
taosEncodeFixedU16
(
pData
,
colId
);
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
return
len
;
}
#if 0
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsTickPerDay[precision] / days));
}
}
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#endif
/* _TD_TSDB_SMA_H_ */
\ No newline at end of file
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
d4123972
...
...
@@ -833,6 +833,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
}
pCur
->
uid
=
uid
;
// TODO: lock?
ret
=
pDB
->
pCtbIdx
->
cursor
(
pDB
->
pSmaIdx
,
NULL
,
&
(
pCur
->
pCur
),
0
);
if
(
ret
!=
0
)
{
free
(
pCur
);
...
...
@@ -852,25 +853,68 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) {
}
}
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
DBT
skey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pval
=
{
0
};
void
*
pBuf
;
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pCur
)
{
DBT
skey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pval
=
{
0
};
// Set key
skey
.
data
=
&
(
pCur
->
uid
);
skey
.
size
=
sizeof
(
pCur
->
uid
);
// TODO: lock?
if
(
pCur
->
pCur
->
pget
(
pCur
->
pCur
,
&
skey
,
&
pkey
,
&
pval
,
DB_NEXT
)
==
0
)
{
const
char
*
indexName
=
(
const
char
*
)
pkey
.
data
;
const
char
*
indexName
=
(
const
char
*
)
pkey
.
data
;
assert
(
indexName
!=
NULL
);
return
indexName
;
}
else
{
return
0
;
return
NULL
;
}
}
STSmaWrapper
*
metaGetSmaInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
STSmaWrapper
*
pSW
=
NULL
;
pSW
=
calloc
(
sizeof
(
*
pSW
),
1
);
if
(
pSW
==
NULL
)
{
return
NULL
;
}
SMSmaCursor
*
pCur
=
metaOpenSmaCursor
(
pMeta
,
uid
);
if
(
pCur
==
NULL
)
{
free
(
pSW
);
return
NULL
;
}
DBT
skey
=
{.
data
=
&
(
pCur
->
uid
)};
DBT
pval
=
{.
size
=
sizeof
(
pCur
->
uid
)};
void
*
pBuf
=
NULL
;
while
(
true
)
{
// TODO: lock?
if
(
pCur
->
pCur
->
pget
(
pCur
->
pCur
,
&
skey
,
NULL
,
&
pval
,
DB_NEXT
)
==
0
)
{
++
pSW
->
number
;
STSma
*
tptr
=
(
STSma
*
)
realloc
(
pSW
->
tSma
,
pSW
->
number
*
sizeof
(
STSma
));
if
(
tptr
==
NULL
)
{
metaCloseSmaCurosr
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
,
true
);
return
NULL
;
}
pSW
->
tSma
=
tptr
;
pBuf
=
pval
.
data
;
if
(
tDecodeTSma
(
pBuf
,
pSW
->
tSma
+
pSW
->
number
-
1
)
==
NULL
)
{
metaCloseSmaCurosr
(
pCur
);
tdDestroyTSmaWrapper
(
pSW
,
true
);
return
NULL
;
}
continue
;
}
break
;
}
metaCloseSmaCurosr
(
pCur
);
return
pSW
;
}
static
void
metaDBWLock
(
SMetaDB
*
pDB
)
{
#if IMPL_WITH_LOCK
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
0 → 100644
浏览文件 @
d4123972
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbDef.h"
#define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_SPLIT_HOURS 24
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
typedef
enum
{
SMA_STORAGE_LEVEL_TSDB
=
0
,
// store TSma in dir e.g. vnode${N}/tsdb/.tsma
SMA_STORAGE_LEVEL_DFILESET
=
1
// store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name}
}
ESmaStorageLevel
;
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
// TODO
}
STSmaWriteH
;
typedef
struct
{
int32_t
iter
;
}
SmaFsIter
;
typedef
struct
{
STsdb
*
pTsdb
;
char
*
pDFile
;
// TODO: use the real DFile type, not char*
int32_t
interval
;
// interval with the precision of DB
int32_t
blockSize
;
// size of SMA block item
int8_t
storageLevel
;
int8_t
days
;
SmaFsIter
smaFsIter
;
// TODO
}
STSmaReadH
;
// declaration of static functions
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbJudgeStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
);
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaData
*
pData
,
int32_t
sectionDataLen
,
int32_t
nBlocks
);
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
);
static
int32_t
tsdbTSmaDataSplit
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
days
,
int32_t
nOffset
,
int32_t
fid
,
int32_t
*
nSmaBlocks
);
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
);
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
storageLevel
,
int32_t
fid
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
/**
* @brief Judge the tSma storage level
*
* @param interval
* @param intervalUnit
* @return int32_t
*/
static
int32_t
tsdbJudgeStorageLevel
(
int64_t
interval
,
int8_t
intervalUnit
)
{
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_HOUR
:
if
(
interval
<
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MINUTE
:
if
(
interval
<
60
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_SEC
:
if
(
interval
<
3600
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MILLISEC
:
if
(
interval
<
3600
*
1e3
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_MICROSEC
:
if
(
interval
<
3600
*
1e6
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
case
TD_TIME_UNIT_NANOSEC
:
if
(
interval
<
3600
*
1e9
*
SMA_STORAGE_SPLIT_HOURS
)
{
return
SMA_STORAGE_LEVEL_DFILESET
;
}
break
;
default:
break
;
}
return
SMA_STORAGE_LEVEL_TSDB
;
}
/**
* @brief Insert TSma data blocks to B+Tree
*
* @param bTree
* @param smaKey
* @param pData
* @param dataLen
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaBlocks
(
void
*
bTree
,
const
char
*
smaKey
,
const
char
*
pData
,
int32_t
dataLen
)
{
// TODO: insert sma data blocks into B+Tree
printf
(
"insert sma data blocks into B+Tree: smaKey %"
PRIx64
"-%"
PRIu16
"-%"
PRIx64
", dataLen %d
\n
"
,
*
(
uint64_t
*
)
smaKey
,
*
(
uint16_t
*
)
POINTER_SHIFT
(
smaKey
,
8
),
*
(
int64_t
*
)
POINTER_SHIFT
(
smaKey
,
10
),
dataLen
);
return
TSDB_CODE_SUCCESS
;
}
static
int64_t
tsdbGetIntervalByPrecision
(
int64_t
interval
,
uint8_t
intervalUnit
,
int8_t
precision
)
{
if
(
intervalUnit
<
TD_TIME_UNIT_MILLISEC
)
{
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_YEAR
:
case
TD_TIME_UNIT_SEASON
:
case
TD_TIME_UNIT_MONTH
:
case
TD_TIME_UNIT_WEEK
:
// illegal time unit
tsdbError
(
"invalid interval unit: %d
\n
"
,
intervalUnit
);
TASSERT
(
0
);
break
;
case
TD_TIME_UNIT_DAY
:
// the interval for tSma calculation must <= day
interval
*=
86400
*
1e3
;
break
;
case
TD_TIME_UNIT_HOUR
:
interval
*=
3600
*
1e3
;
break
;
case
TD_TIME_UNIT_MINUTE
:
interval
*=
60
*
1e3
;
break
;
case
TD_TIME_UNIT_SEC
:
interval
*=
1e3
;
break
;
default:
break
;
}
}
switch
(
intervalUnit
)
{
case
TD_TIME_UNIT_MILLISEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e3
;
}
else
{
// nano second
return
interval
*
1e6
;
}
break
;
case
TD_TIME_UNIT_MICROSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e3
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
;
}
else
{
// nano second
return
interval
*
1e3
;
}
break
;
case
TD_TIME_UNIT_NANOSEC
:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
/
1e6
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
/
1e3
;
}
else
{
// nano second
return
interval
;
}
break
;
default:
if
(
TSDB_TIME_PRECISION_MILLI
==
precision
)
{
return
interval
*
1e3
;
}
else
if
(
TSDB_TIME_PRECISION_MICRO
==
precision
)
{
return
interval
*
1e6
;
}
else
{
// nano second
return
interval
*
1e9
;
}
break
;
}
return
interval
;
}
/**
* @brief Split the TSma data blocks into expected size and insert into B+Tree.
*
* @param pSmaH
* @param pData
* @param nOffset The nOffset of blocks since fid changes.
* @param nBlocks The nBlocks with the same fid since nOffset.
* @return int32_t
*/
static
int32_t
tsdbInsertTSmaDataSection
(
STSmaWriteH
*
pSmaH
,
STSmaData
*
pData
,
int32_t
nOffset
,
int32_t
nBlocks
)
{
STsdb
*
pTsdb
=
pSmaH
->
pTsdb
;
TASSERT
(
pData
->
colIds
!=
NULL
);
tsdbDebug
(
"tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d"
,
nOffset
,
nBlocks
);
printf
(
"tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d
\n
"
,
nOffset
,
nBlocks
);
int32_t
colDataLen
=
pData
->
dataLen
/
pData
->
numOfColIds
;
int32_t
sectionDataLen
=
pSmaH
->
blockSize
*
nBlocks
;
for
(
col_id_t
i
=
0
;
i
<
pData
->
numOfColIds
;
++
i
)
{
// param: pointer of B+Tree, key, value, dataLen
void
*
bTree
=
pSmaH
->
pDFile
;
#ifndef SMA_STORE_SINGLE_BLOCKS
// save tSma data blocks as a whole
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
pData
->
tableUid
,
*
(
pData
->
colIds
+
i
),
pData
->
tsWindow
.
skey
+
nOffset
*
pSmaH
->
interval
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
bTree
,
smaKey
,
pData
->
data
+
i
*
colDataLen
+
nOffset
*
pSmaH
->
blockSize
,
sectionDataLen
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
#else
// save tSma data blocks separately
for
(
int32_t
n
=
0
;
n
<
nBlocks
;
++
n
)
{
char
smaKey
[
SMA_KEY_LEN
]
=
{
0
};
void
*
pSmaKey
=
&
smaKey
;
tsdbEncodeTSmaKey
(
pData
->
tableUid
,
*
(
pData
->
colIds
+
i
),
pData
->
tsWindow
.
skey
+
(
nOffset
+
n
)
*
pSmaH
->
interval
,
(
void
**
)
&
pSmaKey
);
if
(
tsdbInsertTSmaBlocks
(
bTree
,
smaKey
,
pData
->
data
+
i
*
colDataLen
+
(
nOffset
+
n
)
*
pSmaH
->
blockSize
,
pSmaH
->
blockSize
)
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma blocks failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
}
#endif
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
param
->
interval
,
param
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
blockSize
=
param
->
numOfFuncIds
*
sizeof
(
int64_t
);
}
static
int32_t
tsdbSetTSmaDataFile
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
storageLevel
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"tSma_interval_file_name"
;
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Split the sma data blocks by fid.
*
* @param pSmaH
* @param param
* @param pData
* @param nOffset
* @param fid
* @param nSmaBlocks
* @return int32_t
*/
static
int32_t
tsdbTSmaDataSplit
(
STSmaWriteH
*
pSmaH
,
STSma
*
param
,
STSmaData
*
pData
,
int32_t
days
,
int32_t
nOffset
,
int32_t
fid
,
int32_t
*
nSmaBlocks
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pSmaH
->
pTsdb
);
// TODO: use binary search
for
(
int32_t
n
=
nOffset
+
1
;
n
<
pData
->
numOfBlocks
;
++
n
)
{
// TODO: The tsWindow.skey should use the precision of DB.
int32_t
tFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
+
pSmaH
->
interval
*
n
,
days
,
pCfg
->
precision
));
if
(
tFid
>
fid
)
{
*
nSmaBlocks
=
n
-
nOffset
;
break
;
}
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
* v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
* - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
* days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
* - The destination file of one data block for some interval is determined by its start TS key.
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSmaData
*
curData
=
pData
;
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
param
,
pData
);
if
(
pData
->
numOfBlocks
<=
0
||
pData
->
numOfColIds
<=
0
||
pData
->
dataLen
<=
0
)
{
TASSERT
(
0
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
// Step 1: Judge the storage level
int32_t
storageLevel
=
tsdbJudgeStorageLevel
(
param
->
interval
,
param
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
pCfg
->
daysPerFile
;
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file
int32_t
minFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
,
daysPerFile
,
pCfg
->
precision
));
int32_t
maxFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
ekey
,
daysPerFile
,
pCfg
->
precision
));
if
(
minFid
==
maxFid
)
{
// Save all the TSma data to one file
// TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
minFid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
0
,
pData
->
numOfBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
if
(
minFid
<
maxFid
)
{
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t
tFid
=
minFid
;
int32_t
nOffset
=
0
;
int32_t
nSmaBlocks
=
0
;
do
{
tsdbTSmaDataSplit
(
&
tSmaH
,
param
,
pData
,
daysPerFile
,
nOffset
,
tFid
,
&
nSmaBlocks
);
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
tFid
);
if
(
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
nOffset
,
nSmaBlocks
)
<
0
)
{
return
terrno
;
}
++
tFid
;
nOffset
+=
nSmaBlocks
;
if
(
tFid
==
maxFid
)
{
tsdbSetTSmaDataFile
(
&
tSmaH
,
param
,
pData
,
storageLevel
,
tFid
);
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
nOffset
,
pData
->
numOfBlocks
-
nOffset
);
break
;
}
}
while
(
true
);
// TODO:tsdbEndTSmaCommit();
}
else
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsdbSetRSmaDataFile
(
STSmaWriteH
*
pSmaH
,
SRSma
*
param
,
STSmaData
*
pData
,
int32_t
fid
)
{
// TODO
pSmaH
->
pDFile
=
"rSma_interval_file_name"
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
STSma
*
tParam
=
&
param
->
tsma
;
STSmaData
*
curData
=
pData
;
STSmaWriteH
tSmaH
=
{
0
};
tsdbInitTSmaWriteH
(
&
tSmaH
,
pTsdb
,
tParam
,
pData
);
int32_t
nSmaBlocks
=
pData
->
numOfBlocks
;
int32_t
colDataLen
=
pData
->
dataLen
/
nSmaBlocks
;
// Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET
// TODO: Use the daysPerFile for rSma data, not for TS data.
// TODO: The lifecycle of rSma data should be processed like the TS data files.
int32_t
minFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
skey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
int32_t
maxFid
=
(
int32_t
)(
TSDB_KEY_FID
(
pData
->
tsWindow
.
ekey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
if
(
minFid
==
maxFid
)
{
// Save all the TSma data to one file
tsdbSetRSmaDataFile
(
&
tSmaH
,
param
,
pData
,
minFid
);
// TODO: tsdbStartTSmaCommit();
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
colDataLen
,
nSmaBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
if
(
minFid
<
maxFid
)
{
// Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files
// actually.
// TODO: tsdbStartTSmaCommit();
int32_t
tmpFid
=
0
;
int32_t
step
=
0
;
for
(
int32_t
n
=
0
;
n
<
pData
->
numOfBlocks
;
++
n
)
{
}
tsdbInsertTSmaDataSection
(
&
tSmaH
,
pData
,
colDataLen
,
nSmaBlocks
);
// TODO:tsdbEndTSmaCommit();
}
else
{
TASSERT
(
0
);
return
TSDB_CODE_INVALID_PARA
;
}
// Step 4: finish
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Init of tSma ReadH
*
* @param pSmaH
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
pSmaH
->
pTsdb
=
pTsdb
;
pSmaH
->
interval
=
tsdbGetIntervalByPrecision
(
param
->
interval
,
param
->
intervalUnit
,
REPO_CFG
(
pTsdb
)
->
precision
);
pSmaH
->
blockSize
=
param
->
numOfFuncIds
*
sizeof
(
int64_t
);
}
/**
* @brief Init of tSma FS
*
* @param pReadH
* @param param
* @param queryWin
* @return int32_t
*/
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
)
{
int32_t
storageLevel
=
tsdbJudgeStorageLevel
(
param
->
interval
,
param
->
intervalUnit
);
int32_t
daysPerFile
=
storageLevel
==
SMA_STORAGE_LEVEL_TSDB
?
SMA_STORAGE_TSDB_DAYS
:
REPO_CFG
(
pReadH
->
pTsdb
)
->
daysPerFile
;
pReadH
->
storageLevel
=
storageLevel
;
pReadH
->
days
=
daysPerFile
;
pReadH
->
smaFsIter
.
iter
=
0
;
}
/**
* @brief Set and open tSma file if it has key locates in queryWin.
*
* @param pReadH
* @param param
* @param queryWin
* @return true
* @return false
*/
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
)
{
SArray
*
smaFs
=
pReadH
->
pTsdb
->
fs
->
cstatus
->
smaf
;
int32_t
nSmaFs
=
taosArrayGetSize
(
smaFs
);
pReadH
->
pDFile
=
NULL
;
while
(
pReadH
->
smaFsIter
.
iter
<
nSmaFs
)
{
void
*
pSmaFile
=
taosArrayGet
(
smaFs
,
pReadH
->
smaFsIter
.
iter
);
if
(
pSmaFile
)
{
// match(indexName, queryWindow)
// TODO: select the file by index_name ...
pReadH
->
pDFile
=
pSmaFile
;
++
pReadH
->
smaFsIter
.
iter
;
break
;
}
++
pReadH
->
smaFsIter
.
iter
;
}
if
(
pReadH
->
pDFile
!=
NULL
)
{
tsdbDebug
(
"vg%d: smaFile %s matched"
,
REPO_ID
(
pReadH
->
pTsdb
),
"[pSmaFile dir]"
);
return
true
;
}
return
false
;
}
/**
* @brief Return the data between queryWin and fill the pData.
*
* @param pTsdb
* @param param
* @param pData
* @param queryWin
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t
*/
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
)
{
STSmaReadH
tReadH
=
{
0
};
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
param
,
pData
);
tsdbInitTSmaFile
(
&
tReadH
,
param
,
queryWin
);
int32_t
nResult
=
0
;
int64_t
lastKey
=
0
;
while
(
true
)
{
if
(
nResult
>=
nMaxResult
)
{
break
;
}
// set and open the file according to the STSma param
if
(
tsdbSetAndOpenTSmaFile
(
&
tReadH
,
param
,
queryWin
))
{
char
bTree
[
100
]
=
"
\0
"
;
while
(
strncmp
(
bTree
,
"has more nodes"
,
100
)
==
0
)
{
if
(
nResult
>=
nMaxResult
)
{
break
;
}
// tsdbGetDataFromBTree(bTree, queryWin, lastKey)
// fill the pData
++
nResult
;
}
}
}
// read data from file and fill the result
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
*
* @param pTsdb
* @param param
* @param result
* @return int32_t
* 1) Return 0 and fill the result if the check procedure is normal;
* 2) Return -1 if error occurs during the check procedure.
*/
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
)
{
const
char
*
procedure
=
""
;
if
(
strncmp
(
procedure
,
"get the start TS key of the last data block"
,
100
)
!=
0
)
{
return
-
1
;
}
// fill the result
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Remove the tSma data files related to param between pWin.
*
* @param pTsdb
* @param param
* @param pWin
* @return int32_t
*/
int32_t
tsdbRemoveTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STimeWindow
*
pWin
)
{
// for ("tSmaFiles of param-interval-sliding between pWin") {
// // remove the tSmaFile
// }
return
TSDB_CODE_SUCCESS
;
}
#if 0
#define TSDB_MAX_SUBBLOCKS 8
typedef struct {
STable *pTable;
SSkipListIterator *pIter;
} SCommitIter;
typedef struct {
SRtn rtn; // retention snapshot
SFSIter fsIter; // tsdb file iterator
int niters; // memory iterators
SCommitIter *iters;
bool isRFileSet; // read and commit FSET
SReadH readh;
SDFileSet wSet;
bool isDFileSame;
bool isLFileSame;
TSKEY minKey;
TSKEY maxKey;
SArray *aBlkIdx; // SBlockIdx array
STable *pTable;
SArray *aSupBlk; // Table super-block array
SArray *aSubBlk; // table sub-block array
SDataCols *pDataCols;
} STSmaWriteH;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet))
#define TSDB_COMMIT_TABLE(ch) ((ch)->pTable)
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static void tsdbStartCommit(STsdb *pRepo);
static void tsdbEndCommit(STsdb *pTsdb, int eno);
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
// static int tsdbCommitMeta(STsdbRepo *pRepo);
// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
int tsdbPrepareCommit(STsdb *pTsdb) {
if (pTsdb->mem == NULL) return 0;
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
return 0;
}
int tsdbCommit(STsdb *pRepo) {
STsdbMemTable *pMem = pRepo->imem;
SCommitH commith = {0};
SDFileSet *pSet = NULL;
int fid;
if (pRepo->imem == NULL) return 0;
tsdbStartCommit(pRepo);
// Resource initialization
if (tsdbInitCommitH(&commith, pRepo) < 0) {
return -1;
}
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
if (pSet->fid < commith.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else {
break;
}
}
// Loop to commit to each file
fid = tsdbNextCommitFid(&(commith));
while (true) {
// Loop over both on disk and memory
if (pSet == NULL && fid == TSDB_IVLD_FID) break;
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
pSet = tsdbFSIterNext(&(commith.fsIter));
} else {
// Has memory data to commit
SDFileSet *pCSet;
int cfid;
if (pSet == NULL || pSet->fid > fid) {
// Commit to a new FSET with fid: fid
pCSet = NULL;
cfid = fid;
} else {
// Commit to an existing FSET
pCSet = pSet;
cfid = pSet->fid;
pSet = tsdbFSIterNext(&(commith.fsIter));
}
if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
fid = tsdbNextCommitFid(&commith);
}
}
tsdbDestroyCommitH(&commith);
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
return 0;
}
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision));
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
static void tsdbStartCommit(STsdb *pRepo) {
STsdbMemTable *pMem = pRepo->imem;
tsdbInfo("vgId:%d start to commit", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0);
}
static void tsdbEndCommit(STsdb *pTsdb, int eno) {
tsdbEndFSTxn(pTsdb);
tsdbFreeMemTable(pTsdb, pTsdb->imem);
pTsdb->imem = NULL;
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
}
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pCommith, 0, sizeof(*pCommith));
tsdbGetRtnSnap(pRepo, &(pCommith->rtn));
TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith));
// Init read handle
if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
return -1;
}
// Init file iterator
tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbCreateCommitIters(pCommith) < 0) {
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pCommith->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pCommith->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
if (pCommith->aSubBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pCommith->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
return 0;
}
// Skip all keys until key (not included)
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
for (int i = 0; i < pCommith->niters; i++) {
SCommitIter *pIter = pCommith->iters + i;
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL);
}
}
static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int fid = TSDB_IVLD_FID;
for (int i = 0; i < pCommith->niters; i++) {
SCommitIter *pIter = pCommith->iters + i;
// if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
continue;
} else {
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
if (fid == TSDB_IVLD_FID || fid > tfid) {
fid = tfid;
}
}
}
return fid;
}
static void tsdbDestroyCommitH(SCommitH *pCommith) {
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx);
tsdbDestroyCommitIters(pCommith);
tsdbDestroyReadH(&(pCommith->readh));
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);
tsdbResetCommitFile(pCommith);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
// Set and open files
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
return -1;
}
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
if (pIter->pTable == NULL) continue;
if (tsdbCommitToTable(pCommith, tid) < 0) {
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
0) {
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
// Close commit file
tsdbCloseCommitFile(pCommith, false);
if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
return -1;
}
return 0;
}
static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbMemTable *pMem = pRepo->imem;
SSkipListIterator *pSlIter;
SCommitIter *pCommitIter;
SSkipListNode *pNode;
STbData *pTbData;
pCommith->niters = SL_SIZE(pMem->pSlIdx);
pCommith->iters = (SCommitIter *)calloc(pCommith->niters, sizeof(SCommitIter));
if (pCommith->iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
// Loop to create iters for each skiplist
pSlIter = tSkipListCreateIter(pMem->pSlIdx);
if (pSlIter == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
for (int i = 0; i < pCommith->niters; i++) {
tSkipListIterNext(pSlIter);
pNode = tSkipListIterGet(pSlIter);
pTbData = (STbData *)pNode->pData;
pCommitIter = pCommith->iters + i;
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
tSkipListIterNext(pCommitIter->pIter);
pCommitIter->pTable = (STable *)malloc(sizeof(STable));
pCommitIter->pTable->uid = pTbData->uid;
pCommitIter->pTable->tid = pTbData->uid;
pCommitIter->pTable->pSchema = metaGetTbTSchema(pRepo->pMeta, pTbData->uid, 0);
}
return 0;
}
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
if (pCommith->iters == NULL) return;
for (int i = 1; i < pCommith->niters; i++) {
tSkipListDestroyIter(pCommith->iters[i].pIter);
tdFreeSchema(pCommith->iters[i].pTable->pSchema);
free(pCommith->iters[i].pTable);
}
free(pCommith->iters);
pCommith->iters = NULL;
pCommith->niters = 0;
}
static void tsdbResetCommitFile(SCommitH *pCommith) {
pCommith->isRFileSet = false;
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
taosArrayClear(pCommith->aBlkIdx);
}
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
// Open read FSET
if (pSet) {
if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
return -1;
}
pCommith->isRFileSet = true;
if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else {
pCommith->isRFileSet = false;
}
// Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
return -1;
}
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else {
did.level = TSDB_FSET_LEVEL(pSet);
did.id = TSDB_FSET_ID(pSet);
pCommith->wSet.fid = fid;
pCommith->wSet.state = 0;
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
// TSDB_FILE_DATA
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
tsdbInitDFileEx(pWDataf, pRDataf);
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
pCommith->isDFileSame = true;
// TSDB_FILE_LAST
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
return 0;
}
// extern int32_t tsTsdbMetaCompactRatio;
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) {
size_t nSupBlocks;
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
SBlock *pBlock;
memset(pIdx, 0, sizeof(*pIdx));
nSupBlocks = taosArrayGetSize(pSupA);
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
pBlkInfo = *ppBuf;
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// Set pIdx
pBlock = taosArrayGetLast(pSupA);
pIdx->tid = TABLE_TID(pTable);
pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
pIdx->len = tlen;
pIdx->offset = (uint32_t)offset;
return 0;
}
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size;
int64_t offset;
if (nidx <= 0) {
// All data are deleted
pHeadf->info.offset = 0;
pHeadf->info.len = 0;
return 0;
}
for (size_t i = 0; i < nidx; i++) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size;
}
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
return -1;
}
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
pHeadf->info.offset = (uint32_t)offset;
pHeadf->info.len = tlen;
return 0;
}
// // =================== Commit Meta Data
// static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) {
// STsdbFS * pfs = REPO_FS(pRepo);
// SMFile * pOMFile = pfs->cstatus->pmf;
// SDiskID did;
// // Create/Open a meta file or open the existing file
// if (pOMFile == NULL) {
// // Create a new meta file
// did.level = TFS_PRIMARY_LEVEL;
// did.id = TFS_PRIMARY_ID;
// tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
// if (open && tsdbCreateMFile(pMf, true) < 0) {
// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf));
// } else {
// tsdbInitMFileEx(pMf, pOMFile);
// if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) {
// tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// }
// return 0;
// }
// static int tsdbCommitMeta(STsdbRepo *pRepo) {
// STsdbFS * pfs = REPO_FS(pRepo);
// SMemTable *pMem = pRepo->imem;
// SMFile * pOMFile = pfs->cstatus->pmf;
// SMFile mf;
// SActObj * pAct = NULL;
// SActCont * pCont = NULL;
// SListNode *pNode = NULL;
// ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
// if (listNEles(pMem->actList) <= 0) {
// // no meta data to commit, just keep the old meta file
// tsdbUpdateMFile(pfs, pOMFile);
// if (tsTsdbMetaCompactRatio > 0) {
// if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) {
// return -1;
// }
// int ret = tsdbCompactMetaFile(pRepo, pfs, &mf);
// if (ret < 0) tsdbError("compact meta file error");
// return ret;
// }
// return 0;
// } else {
// if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) {
// return -1;
// }
// }
// // Loop to write
// while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
// pAct = (SActObj *)pNode->data;
// if (pAct->act == TSDB_UPDATE_META) {
// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
// if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) {
// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tsdbCloseMFile(&mf);
// (void)tsdbApplyMFileChange(&mf, pOMFile);
// // TODO: need to reload metaCache
// return -1;
// }
// } else if (pAct->act == TSDB_DROP_META) {
// if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) {
// tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tsdbCloseMFile(&mf);
// tsdbApplyMFileChange(&mf, pOMFile);
// // TODO: need to reload metaCache
// return -1;
// }
// } else {
// ASSERT(false);
// }
// }
// if (tsdbUpdateMFileHeader(&mf) < 0) {
// tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
// tsdbApplyMFileChange(&mf, pOMFile);
// // TODO: need to reload metaCache
// return -1;
// }
// TSDB_FILE_FSYNC(&mf);
// tsdbCloseMFile(&mf);
// tsdbUpdateMFile(pfs, &mf);
// if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) {
// tsdbError("compact meta file error");
// }
// return 0;
// }
// int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) {
// int tlen = 0;
// tlen += taosEncodeFixedU64(buf, pRecord->uid);
// tlen += taosEncodeFixedI64(buf, pRecord->offset);
// tlen += taosEncodeFixedI64(buf, pRecord->size);
// return tlen;
// }
// void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
// buf = taosDecodeFixedU64(buf, &(pRecord->uid));
// buf = taosDecodeFixedI64(buf, &(pRecord->offset));
// buf = taosDecodeFixedI64(buf, &(pRecord->size));
// return buf;
// }
// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) {
// char buf[64] = "\0";
// void * pBuf = buf;
// SKVRecord rInfo;
// int64_t offset;
// // Seek to end of meta file
// offset = tsdbSeekMFile(pMFile, 0, SEEK_END);
// if (offset < 0) {
// return -1;
// }
// rInfo.offset = offset;
// rInfo.uid = uid;
// rInfo.size = contLen;
// int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo);
// if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) {
// return -1;
// }
// if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) {
// return -1;
// }
// tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
// SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache;
// pMFile->info.nRecords++;
// SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid));
// if (pRecord != NULL) {
// pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
// } else {
// pMFile->info.nRecords++;
// }
// taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
// return 0;
// }
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
// SKVRecord rInfo = {0};
// char buf[128] = "\0";
// SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid));
// if (pRecord == NULL) {
// tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid);
// return -1;
// }
// rInfo.offset = -pRecord->offset;
// rInfo.uid = pRecord->uid;
// rInfo.size = pRecord->size;
// void *pBuf = buf;
// tsdbEncodeKVRecord(&pBuf, &rInfo);
// if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) {
// return -1;
// }
// pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord));
// pMFile->info.nDels++;
// pMFile->info.nRecords--;
// pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
// taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid));
// return 0;
// }
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
// float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords);
// float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
// float compactRatio = (float)(tsTsdbMetaCompactRatio)/100;
// if (delPercent < compactRatio && tombPercent < compactRatio) {
// return 0;
// }
// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) {
// tsdbError("open meta file %s compact fail", pMFile->f.rname);
// return -1;
// }
// tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64
// ",size:%" PRId64,
// tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size);
// SMFile mf;
// SDiskID did;
// // first create tmp meta file
// did.level = TFS_PRIMARY_LEVEL;
// did.id = TFS_PRIMARY_ID;
// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1);
// if (tsdbCreateMFile(&mf, true) < 0) {
// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
// // second iterator metaCache
// int code = -1;
// int64_t maxBufSize = 1024;
// SKVRecord *pRecord;
// void *pBuf = NULL;
// pBuf = malloc((size_t)maxBufSize);
// if (pBuf == NULL) {
// goto _err;
// }
// // init Comp
// assert(pfs->metaCacheComp == NULL);
// pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
// if (pfs->metaCacheComp == NULL) {
// goto _err;
// }
// pRecord = taosHashIterate(pfs->metaCache, NULL);
// while (pRecord) {
// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
// tstrerror(terrno));
// goto _err;
// }
// if (pRecord->size > maxBufSize) {
// maxBufSize = pRecord->size;
// void* tmp = realloc(pBuf, (size_t)maxBufSize);
// if (tmp == NULL) {
// goto _err;
// }
// pBuf = tmp;
// }
// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size);
// if (nread < 0) {
// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
// tstrerror(terrno));
// goto _err;
// }
// if (nread < pRecord->size) {
// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d",
// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread);
// goto _err;
// }
// if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) {
// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid,
// tstrerror(terrno));
// goto _err;
// }
// pRecord = taosHashIterate(pfs->metaCache, pRecord);
// }
// code = 0;
// _err:
// if (code == 0) TSDB_FILE_FSYNC(&mf);
// tsdbCloseMFile(&mf);
// tsdbCloseMFile(pMFile);
// if (code == 0) {
// // rename meta.tmp -> meta
// tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
// TSDB_FILE_FULL_NAME(pMFile)); taosRename(mf.f.aname,pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname,
// TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
// // update current meta file info
// pfs->nstatus->pmf = NULL;
// tsdbUpdateMFile(pfs, &mf);
// taosHashCleanup(pfs->metaCache);
// pfs->metaCache = pfs->metaCacheComp;
// pfs->metaCacheComp = NULL;
// } else {
// // remove meta.tmp file
// remove(mf.f.aname);
// taosHashCleanup(pfs->metaCacheComp);
// pfs->metaCacheComp = NULL;
// }
// tfree(pBuf);
// ASSERT(mf.info.nDels == 0);
// ASSERT(mf.info.tombSize == 0);
// tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64,
// code,mf.info.nRecords,mf.info.size);
// return code;
// }
// // =================== Commit Time-Series Data
// #if 0
// static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
// for (int i = 0; i < nIters; i++) {
// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
// if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true;
// }
// return false;
// }
// #endif
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SCommitIter *pIter = pCommith->iters + tid;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
tsdbResetCommitTable(pCommith);
// Set commit table
if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
return -1;
}
// No disk data and no memory data, just return
if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
return 0;
}
// Must has disk data or has memory data
int nBlocks;
int bidx = 0;
SBlock *pBlock;
if (pCommith->readh.pBlkIdx) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
return -1;
}
nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
} else {
nBlocks = 0;
}
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
while (true) {
if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
if (tsdbMoveBlock(pCommith, bidx) < 0) {
return -1;
}
bidx++;
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
} else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// merge pBlock data and memory data
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
return -1;
}
bidx++;
if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else {
pBlock = NULL;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
// Only commit memory data
if (pBlock == NULL) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
return -1;
}
} else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
return -1;
}
}
nextKey = tsdbNextIterKey(pIter->pIter);
}
}
if (tsdbWriteBlockInfo(pCommith) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
return -1;
}
return 0;
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable;
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pCommith->isRFileSet) {
if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) {
return -1;
}
} else {
pCommith->readh.pBlkIdx = NULL;
}
return 0;
}
static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SBlock *pBlock = (SBlock *)arg2;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
} else {
return 0;
}
}
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData;
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
// Make buffer space
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
// Get # of cols not all NULL(not including key column)
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pBlockCol, 0, sizeof(*pBlockCol));
pBlockCol->colId = pDataCol->colId;
pBlockCol->type = pDataCol->type;
if (tDataTypes[pDataCol->type].statisFunc) {
(*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max),
&(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex),
&(pBlockCol->numOfNull));
if (pBlockCol->numOfNull == 0) {
TD_SET_COL_ROWS_NORM(pBlockCol);
} else {
TD_SET_COL_ROWS_MISC(pBlockCol);
}
} else {
TD_SET_COL_ROWS_MISC(pBlockCol);
}
nColsNotAllNull++;
}
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull);
int32_t lsize = tsize;
int32_t keyLen = 0;
int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite);
int32_t tBitmaps = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
// All not NULL columns finish
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue;
int32_t flen; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
#ifdef TD_SUPPORT_BITMAP
int32_t tBitmaps = 0;
if ((ncol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) {
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
tBitmaps = nBitmaps;
tlen += tBitmaps;
} else {
tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
tlen += tBitmaps * TYPE_BYTES[pDataCol->type];
}
// move bitmap parts ahead
// TODO: put bitmap part to the 1st location(pBitmap points to pData) to avoid the memmove
memcpy(POINTER_SHIFT(pDataCol->pData, pDataCol->len), pDataCol->pBitmap, nBitmaps);
}
#endif
void *tptr;
// Make room
if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
return -1;
}
pBlockData = (SBlockData *)(*ppBuf);
pBlockCol = pBlockData->cols + tcol;
tptr = POINTER_SHIFT(pBlockData, lsize);
if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
return -1;
}
// Compress or just copy
if (pCfg->compression) {
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr,
tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
tlen + COMP_OVERFLOW_BYTES);
} else {
flen = tlen;
memcpy(tptr, pDataCol->pData, flen);
}
// Add checksum
ASSERT(flen > 0);
flen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)));
if (ncol != 0) {
tsdbSetBlockColOffset(pBlockCol, toffset);
pBlockCol->len = flen;
tcol++;
} else {
keyLen = flen;
}
toffset += flen;
lsize += flen;
}
pBlockData->delimiter = TSDB_FILE_DELIMITER;
pBlockData->uid = TABLE_UID(pTable);
pBlockData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
// Write the whole block to file
if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
return -1;
}
// Update pBlock membership vairables
pBlock->last = isLast;
pBlock->offset = offset;
pBlock->algorithm = pCfg->compression;
pBlock->numOfRows = rowsToWrite;
pBlock->len = lsize;
pBlock->keyLen = keyLen;
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
return 0;
}
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
STable *pTable = TSDB_COMMIT_TABLE(pCommih);
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) {
return -1;
}
if (blkIdx.numOfBlocks == 0) {
return 0;
}
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
SDFile *pDFile;
bool isLast;
SBlock block;
while (true) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0,
pCfg->update, &mInfo);
if (pCommith->pDataCols->numOfRows <= 0) break;
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
} else {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
}
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
return -1;
}
}
return 0;
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit;
int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID;
SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock;
SDFile *pDFile;
if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey;
} else {
keyLimit = pBlock[1].keyFirst - 1;
}
SSkipListIterator titer = *(pIter->pIter);
if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1;
tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData,
pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
if (mInfo.nOperations == 0) {
// no new data to insert (all updates denied)
if (tsdbMoveBlock(pCommith, bidx) < 0) {
return -1;
}
*(pIter->pIter) = titer;
} else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
// Ignore the block
ASSERT(0);
*(pIter->pIter) = titer;
} else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
// Add a sub-block
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
&mInfo);
if (pBlock->last) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
}
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
if (pBlock->numOfSubBlocks == 1) {
subBlocks[0] = *pBlock;
subBlocks[0].numOfSubBlocks = 0;
} else {
memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
sizeof(SBlock) * pBlock->numOfSubBlocks);
}
subBlocks[pBlock->numOfSubBlocks] = block;
supBlock = *pBlock;
supBlock.keyFirst = mInfo.keyFirst;
supBlock.keyLast = mInfo.keyLast;
supBlock.numOfSubBlocks++;
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
} else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
}
return 0;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pDFile;
SBlock block;
bool isSameFile;
ASSERT(pBlock->numOfSubBlocks > 0);
if (pBlock->last) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isSameFile = pCommith->isLFileSame;
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isSameFile = pCommith->isDFileSame;
}
if (isSameFile) {
if (pBlock->numOfSubBlocks == 1) {
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) {
return -1;
}
} else {
block = *pBlock;
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk);
if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
pBlock->numOfSubBlocks) < 0) {
return -1;
}
}
} else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
}
return 0;
}
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlock block;
SDFile *pDFile;
bool isLast;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
int biter = 0;
while (true) {
tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
pCfg->update);
if (pCommith->pDataCols->numOfRows == 0) break;
if (isLastOneBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
}
} else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
}
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
}
return 0;
}
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
STSRow *row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || TD_ROW_KEY(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = TD_ROW_KEY(row);
}
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
// TODO: dataColAppendVal may fail
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) {
TASSERT(0);
}
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints);
}
pTarget->numOfRows++;
(*iter)++;
} else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row));
ASSERT(pSchema != NULL);
}
tdAppendSTSRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter);
} else {
if (update != TD_ROW_OVERWRITE_UPDATE) {
// copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
// TODO: dataColAppendVal may fail
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) {
TASSERT(0);
}
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints);
}
if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
}
if (update != TD_ROW_DISCARD_UPDATE) {
// copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row));
ASSERT(pSchema != NULL);
}
tdAppendSTSRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
}
(*iter)++;
tSkipListIterNext(pCommitIter->pIter);
}
if (pTarget->numOfRows >= maxRows) break;
}
}
static void tsdbResetCommitTable(SCommitH *pCommith) {
taosArrayClear(pCommith->aSubBlk);
taosArrayClear(pCommith->aSupBlk);
pCommith->pTable = NULL;
}
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
if (!hasError) {
TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith));
}
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
ASSERT(mergeRows > 0);
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
if (pBlock->last) {
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
} else {
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
}
}
return false;
}
// int tsdbApplyRtn(STsdbRepo *pRepo) {
// SRtn rtn;
// SFSIter fsiter;
// STsdbFS * pfs = REPO_FS(pRepo);
// SDFileSet *pSet;
// // Get retention snapshot
// tsdbGetRtnSnap(pRepo, &rtn);
// tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
// while ((pSet = tsdbFSIterNext(&fsiter))) {
// if (pSet->fid < rtn.minFid) {
// tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
// TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
// continue;
// }
// if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) {
// return -1;
// }
// }
// return 0;
// }
#endif
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
d4123972
...
...
@@ -15,6 +15,14 @@
#include "tsdbDef.h"
/**
* @brief insert TS data
*
* @param pTsdb
* @param pMsg
* @param pRsp
* @return int
*/
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
)
{
// Check if mem is there. If not, create one.
if
(
pTsdb
->
mem
==
NULL
)
{
...
...
@@ -24,4 +32,37 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
}
}
return
tsdbMemTableInsert
(
pTsdb
,
pTsdb
->
mem
,
pMsg
,
NULL
);
}
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertTSmaDataImpl
(
pTsdb
,
param
,
pData
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert tSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param pData
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbInsertRSmaDataImpl
(
pTsdb
,
param
,
pData
))
<
0
)
{
tsdbWarn
(
"vgId:%d insert rSma data failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
d4123972
...
...
@@ -132,6 +132,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
case
TDMT_VND_CREATE_SMA
:
{
// timeRangeSMA
// 1. tdCreateSmaMeta(pVnode->pMeta,...);
// 2. tdCreateSmaDataInit();
// 3. tdCreateSmaData
}
break
;
case
TDMT_VND_CANCEL_SMA
:
{
// timeRangeSMA
}
break
;
case
TDMT_VND_DROP_SMA
:
{
// timeRangeSMA
}
break
;
default:
ASSERT
(
0
);
break
;
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
d4123972
...
...
@@ -95,7 +95,7 @@ TEST(testCase, tSmaEncodeDecodeTest) {
// resource release
tdDestroyTSma
(
&
tSma
,
false
);
tdDestroyTSmaWrapper
(
&
dstTSmaWrapper
);
tdDestroyTSmaWrapper
(
&
dstTSmaWrapper
,
false
);
}
TEST
(
testCase
,
tSma_DB_Put_Get_Del_Test
)
{
...
...
@@ -161,7 +161,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
EXPECT_EQ
(
qSmaCfg
->
interval
,
tSma
.
interval
);
tdDestroyTSma
(
qSmaCfg
,
true
);
// get
valu
e by table uid
// get
index nam
e by table uid
SMSmaCursor
*
pSmaCur
=
metaOpenSmaCursor
(
pMeta
,
tbUid
);
assert
(
pSmaCur
!=
NULL
);
uint32_t
indexCnt
=
0
;
...
...
@@ -176,6 +176,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
EXPECT_EQ
(
indexCnt
,
2
);
metaCloseSmaCurosr
(
pSmaCur
);
// get wrapper by table uid
STSmaWrapper
*
pSW
=
metaGetSmaInfoByUid
(
pMeta
,
tbUid
);
assert
(
pSW
!=
NULL
);
EXPECT_EQ
(
pSW
->
number
,
2
);
EXPECT_STRCASEEQ
(
pSW
->
tSma
->
indexName
,
smaIndexName1
);
EXPECT_EQ
(
pSW
->
tSma
->
tableUid
,
tSma
.
tableUid
);
EXPECT_STRCASEEQ
((
pSW
->
tSma
+
1
)
->
indexName
,
smaIndexName2
);
EXPECT_EQ
((
pSW
->
tSma
+
1
)
->
tableUid
,
tSma
.
tableUid
);
// resource release
metaRemoveSmaFromDb
(
pMeta
,
smaIndexName1
);
metaRemoveSmaFromDb
(
pMeta
,
smaIndexName2
);
...
...
@@ -197,15 +206,15 @@ TEST(testCase, tSmaInsertTest) {
int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t);
int32_t numOfColIds = 3;
int32_t numOf
Sma
Blocks = 10;
int32_t numOfBlocks = 10;
int32_t dataLen = numOfColIds * numOf
Sma
Blocks * blockSize;
int32_t dataLen = numOfColIds * numOfBlocks * blockSize;
pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen);
ASSERT_EQ(pSmaData != NULL, true);
pSmaData->tableUid = 3232329230;
pSmaData->numOfColIds = numOfColIds;
pSmaData->numOf
SmaBlocks = numOfSma
Blocks;
pSmaData->numOf
Blocks = numOf
Blocks;
pSmaData->dataLen = dataLen;
pSmaData->tsWindow.skey = 1640000000;
pSmaData->tsWindow.ekey = 1645788649;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录