Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
313887b7
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
313887b7
编写于
4月 19, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change name style in tsdb
上级
c060d18f
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
93 addition
and
92 deletion
+93
-92
src/inc/tsdb.h
src/inc/tsdb.h
+42
-41
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+3
-3
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+7
-7
src/tsdb/src/tsdbCache.c
src/tsdb/src/tsdbCache.c
+1
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+19
-19
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-1
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+3
-3
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+15
-15
src/tsdb/tests/tsdbTests.cpp
src/tsdb/tests/tsdbTests.cpp
+2
-2
未找到文件。
src/inc/tsdb.h
浏览文件 @
313887b7
...
...
@@ -61,13 +61,13 @@ STsdbCfg *tsdbCreateDefaultCfg();
void
tsdbFreeCfg
(
STsdbCfg
*
pCfg
);
// --------- TSDB REPOSITORY DEFINITION
typedef
void
tsdb_repo_t
;
// use void to hide implementation details from outside
typedef
void
TsdbRepoT
;
// use void to hide implementation details from outside
int
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
);
int32_t
tsdbDropRepo
(
tsdb_repo_t
*
repo
);
tsdb_repo_t
*
tsdbOpenRepo
(
char
*
tsdbDir
,
STsdbAppH
*
pAppH
);
int32_t
tsdbCloseRepo
(
tsdb_repo_t
*
repo
);
int32_t
tsdbConfigRepo
(
tsdb_repo_t
*
repo
,
STsdbCfg
*
pCfg
);
int32_t
tsdbDropRepo
(
TsdbRepoT
*
repo
);
TsdbRepoT
*
tsdbOpenRepo
(
char
*
tsdbDir
,
STsdbAppH
*
pAppH
);
int32_t
tsdbCloseRepo
(
TsdbRepoT
*
repo
);
int32_t
tsdbConfigRepo
(
TsdbRepoT
*
repo
,
STsdbCfg
*
pCfg
);
// --------- TSDB TABLE DEFINITION
typedef
struct
{
...
...
@@ -97,11 +97,11 @@ int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int
tsdbTableSetSName
(
STableCfg
*
config
,
char
*
sname
,
bool
dup
);
void
tsdbClearTableCfg
(
STableCfg
*
config
);
int32_t
tsdbGetTableTagVal
(
tsdb_repo_t
*
repo
,
STableId
id
,
int32_t
col
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
);
int32_t
tsdbGetTableTagVal
(
TsdbRepoT
*
repo
,
STableId
id
,
int32_t
col
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
);
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
);
int
tsdbCreateTable
(
TsdbRepoT
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
TsdbRepoT
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
TsdbRepoT
*
repo
,
STableCfg
*
pCfg
);
// the TSDB repository info
typedef
struct
STsdbRepoInfo
{
...
...
@@ -111,7 +111,7 @@ typedef struct STsdbRepoInfo {
int64_t
tsdbTotalDiskSize
;
// the total disk size taken by this TSDB repository
// TODO: Other informations to add
}
STsdbRepoInfo
;
STsdbRepoInfo
*
tsdbGetStatus
(
tsdb_repo_t
*
pRepo
);
STsdbRepoInfo
*
tsdbGetStatus
(
TsdbRepoT
*
pRepo
);
// the meter information report structure
typedef
struct
{
...
...
@@ -120,7 +120,7 @@ typedef struct {
int64_t
tableTotalDataSize
;
// In bytes
int64_t
tableTotalDiskSize
;
// In bytes
}
STableInfo
;
STableInfo
*
tsdbGetTableInfo
(
tsdb_repo_t
*
pRepo
,
STableId
tid
);
STableInfo
*
tsdbGetTableInfo
(
TsdbRepoT
*
pRepo
,
STableId
tid
);
// -- FOR INSERT DATA
/**
...
...
@@ -130,11 +130,11 @@ STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t
tsdbInsertData
(
tsdb_repo_t
*
pRepo
,
SSubmitMsg
*
pMsg
);
int32_t
tsdbInsertData
(
TsdbRepoT
*
pRepo
,
SSubmitMsg
*
pMsg
);
// -- FOR QUERY TIME SERIES DATA
typedef
void
*
tsdb_query_handle_t
;
// Use void to hide implementation details
typedef
void
*
TsdbQueryHandleT
;
// Use void to hide implementation details
typedef
struct
STableGroupList
{
// qualified table object list in group
SArray
*
pGroupList
;
...
...
@@ -167,7 +167,7 @@ typedef struct SDataBlockInfo {
typedef
struct
{
size_t
numOfTables
;
SArray
*
pGroupList
;
SArray
*
pGroupList
;
}
STableGroupInfo
;
typedef
struct
{
...
...
@@ -181,7 +181,7 @@ typedef struct SQueryRowCond {
TSKEY
ts
;
}
SQueryRowCond
;
typedef
void
*
tsdbpos_t
;
typedef
void
*
TsdbPosT
;
/**
* Get the data block iterator, starting from position according to the query condition
...
...
@@ -189,14 +189,15 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list
* @return
*/
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
SArray
*
pColumnInfo
);
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
SArray
*
pColumnInfo
);
/**
* move to next block
* @param pQueryHandle
* @return
*/
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
);
bool
tsdbNextDataBlock
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
* Get current data block information
...
...
@@ -204,7 +205,7 @@ bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle);
* @param pQueryHandle
* @return
*/
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
tsdb_query_handle_t
*
pQueryHandle
);
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
*
...
...
@@ -216,7 +217,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle);
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdb_query_handle_t
*
pQueryHandle
,
SDataStatis
**
pBlockStatis
);
int32_t
tsdbRetrieveDataBlockStatisInfo
(
TsdbQueryHandleT
*
pQueryHandle
,
SDataStatis
**
pBlockStatis
);
/**
* The query condition with primary timestamp is passed to iterator during its constructor function,
...
...
@@ -226,7 +227,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData
* @param pQueryHandle
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
,
SArray
*
pIdList
);
SArray
*
tsdbRetrieveDataBlock
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
);
/**
* todo remove the parameter of position, and order type
...
...
@@ -238,21 +239,21 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
* @param order ascending order or descending order
* @return
*/
int32_t
tsdbResetQuery
(
tsdb_query_handle_t
*
pQueryHandle
,
STimeWindow
*
window
,
tsdbpos_t
position
,
int16_t
order
);
int32_t
tsdbResetQuery
(
TsdbQueryHandleT
*
pQueryHandle
,
STimeWindow
*
window
,
TsdbPosT
position
,
int16_t
order
);
/**
* return the access position of current query handle
* @param pQueryHandle
* @return
*/
int32_t
tsdbDataBlockSeek
(
tsdb_query_handle_t
*
pQueryHandle
,
tsdbpos_t
pos
);
int32_t
tsdbDataBlockSeek
(
TsdbQueryHandleT
*
pQueryHandle
,
TsdbPosT
pos
);
/**
* todo remove this function later
* @param pQueryHandle
* @return
*/
tsdbpos_t
tsdbDataBlockTell
(
tsdb_query_handle_t
*
pQueryHandle
);
TsdbPosT
tsdbDataBlockTell
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
* todo remove this function later
...
...
@@ -260,7 +261,7 @@ tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle);
* @param pIdList
* @return
*/
SArray
*
tsdbRetrieveDataRow
(
tsdb_query_handle_t
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
);
SArray
*
tsdbRetrieveDataRow
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
);
/**
* Get iterator for super tables, of which tags values satisfy the tag filter info
...
...
@@ -273,7 +274,7 @@ SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList,
* @param pTagFilterStr tag filter info
* @return
*/
tsdb_query_handle_t
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
);
TsdbQueryHandleT
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
);
/**
* Get the qualified tables for (super) table query.
...
...
@@ -283,7 +284,7 @@ tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stable
* @param pQueryHandle
* @return table sid list. the invoker is responsible for the release of this the sid list.
*/
SArray
*
tsdbGetTableList
(
tsdb_query_handle_t
*
pQueryHandle
);
SArray
*
tsdbGetTableList
(
TsdbQueryHandleT
*
pQueryHandle
);
/**
* Get the qualified table id for a super table according to the tag query expression.
...
...
@@ -291,16 +292,16 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
* @param pTagCond. tag query condition
*
*/
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
int32_t
tsdbQueryTags
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetOneTableGroup
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
* @param queryHandle
*/
void
tsdbCleanupQueryHandle
(
tsdb_query_handle_t
queryHandle
);
void
tsdbCleanupQueryHandle
(
TsdbQueryHandleT
queryHandle
);
#ifdef __cplusplus
}
...
...
src/query/src/queryExecutor.c
浏览文件 @
313887b7
...
...
@@ -2569,7 +2569,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
dTrace
(
"QInfo:%p query start, qrange:%"
PRId64
"-%"
PRId64
", lastkey:%"
PRId64
", order:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
lastKey
,
pQuery
->
order
.
order
);
tsdb_query_handle_t
pQueryHandle
=
pRuntimeEnv
->
scanFlag
==
MASTER_SCAN
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
TsdbQueryHandleT
pQueryHandle
=
pRuntimeEnv
->
scanFlag
==
MASTER_SCAN
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
isQueryKilled
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
...
...
@@ -3443,7 +3443,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
STimeWindow
w
=
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
// reverse scan from current position
tsdbpos_t
current
=
tsdbDataBlockTell
(
pRuntimeEnv
->
pQueryHandle
);
TsdbPosT
current
=
tsdbDataBlockTell
(
pRuntimeEnv
->
pQueryHandle
);
tsdbResetQuery
(
pRuntimeEnv
->
pQueryHandle
,
&
w
,
current
,
pQuery
->
order
.
order
);
doScanAllDataBlocks
(
pRuntimeEnv
);
...
...
@@ -4329,7 +4329,7 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
int64_t
st
=
taosGetTimestampMs
();
tsdb_query_handle_t
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
TsdbQueryHandleT
*
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
isQueryKilled
(
pQInfo
))
{
break
;
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
313887b7
...
...
@@ -120,7 +120,7 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name)
/* TODO */
STsdbMeta
*
tsdbGetMeta
(
tsdb_repo_t
*
pRepo
);
STsdbMeta
*
tsdbGetMeta
(
TsdbRepoT
*
pRepo
);
int32_t
tsdbCreateTableImpl
(
STsdbMeta
*
pMeta
,
STableCfg
*
pCfg
);
int32_t
tsdbDropTableImpl
(
STsdbMeta
*
pMeta
,
STableId
tableId
);
...
...
@@ -160,10 +160,10 @@ typedef struct {
STsdbCacheBlock
*
curBlock
;
SCacheMem
*
mem
;
SCacheMem
*
imem
;
tsdb_repo_t
*
pRepo
;
TsdbRepoT
*
pRepo
;
}
STsdbCache
;
STsdbCache
*
tsdbInitCache
(
int
maxBytes
,
int
cacheBlockSize
,
tsdb_repo_t
*
pRepo
);
STsdbCache
*
tsdbInitCache
(
int
maxBytes
,
int
cacheBlockSize
,
TsdbRepoT
*
pRepo
);
void
tsdbFreeCache
(
STsdbCache
*
pCache
);
void
*
tsdbAllocFromCache
(
STsdbCache
*
pCache
,
int
bytes
,
TSKEY
key
);
...
...
@@ -310,7 +310,7 @@ typedef struct {
SCompCol
cols
[];
}
SCompData
;
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
);
STsdbFileH
*
tsdbGetFile
(
TsdbRepoT
*
pRepo
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
...
...
@@ -370,9 +370,9 @@ typedef struct {
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
);
SSubmitBlk
*
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
);
int32_t
tsdbTriggerCommit
(
tsdb_repo_t
*
repo
);
int32_t
tsdbLockRepo
(
tsdb_repo_t
*
repo
);
int32_t
tsdbUnLockRepo
(
tsdb_repo_t
*
repo
);
int32_t
tsdbTriggerCommit
(
TsdbRepoT
*
repo
);
int32_t
tsdbLockRepo
(
TsdbRepoT
*
repo
);
int32_t
tsdbUnLockRepo
(
TsdbRepoT
*
repo
);
typedef
enum
{
TSDB_WRITE_HELPER
,
TSDB_READ_HELPER
}
tsdb_rw_helper_t
;
...
...
src/tsdb/src/tsdbCache.c
浏览文件 @
313887b7
...
...
@@ -21,7 +21,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static
void
tsdbFreeBlockList
(
SList
*
list
);
static
void
tsdbFreeCacheMem
(
SCacheMem
*
mem
);
STsdbCache
*
tsdbInitCache
(
int
maxBytes
,
int
cacheBlockSize
,
tsdb_repo_t
*
pRepo
)
{
STsdbCache
*
tsdbInitCache
(
int
maxBytes
,
int
cacheBlockSize
,
TsdbRepoT
*
pRepo
)
{
STsdbCache
*
pCache
=
(
STsdbCache
*
)
calloc
(
1
,
sizeof
(
STsdbCache
));
if
(
pCache
==
NULL
)
return
NULL
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
313887b7
...
...
@@ -54,7 +54,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
);
static
int32_t
tsdbDestroyRepoEnv
(
STsdbRepo
*
pRepo
);
// static int tsdbOpenMetaFile(char *tsdbDir);
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBlk
*
pBlock
);
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
...
...
@@ -156,7 +156,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */)
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbDropRepo
(
tsdb_repo_t
*
repo
)
{
int32_t
tsdbDropRepo
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
pRepo
->
state
=
TSDB_REPO_STATE_CLOSED
;
...
...
@@ -182,7 +182,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
tsdb_repo_t
*
tsdbOpenRepo
(
char
*
tsdbDir
,
STsdbAppH
*
pAppH
)
{
TsdbRepoT
*
tsdbOpenRepo
(
char
*
tsdbDir
,
STsdbAppH
*
pAppH
)
{
char
dataDir
[
128
]
=
"
\0
"
;
if
(
access
(
tsdbDir
,
F_OK
|
W_OK
|
R_OK
)
<
0
)
{
return
NULL
;
...
...
@@ -205,7 +205,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return
NULL
;
}
pRepo
->
tsdbCache
=
tsdbInitCache
(
pRepo
->
config
.
maxCacheSize
,
-
1
,
(
tsdb_repo_t
*
)
pRepo
);
pRepo
->
tsdbCache
=
tsdbInitCache
(
pRepo
->
config
.
maxCacheSize
,
-
1
,
(
TsdbRepoT
*
)
pRepo
);
if
(
pRepo
->
tsdbCache
==
NULL
)
{
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
free
(
pRepo
->
rootDir
);
...
...
@@ -225,7 +225,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
pRepo
->
state
=
TSDB_REPO_STATE_ACTIVE
;
return
(
tsdb_repo_t
*
)
pRepo
;
return
(
TsdbRepoT
*
)
pRepo
;
}
// static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
...
...
@@ -240,7 +240,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbCloseRepo
(
tsdb_repo_t
*
repo
)
{
int32_t
tsdbCloseRepo
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
==
NULL
)
return
0
;
...
...
@@ -285,7 +285,7 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbConfigRepo
(
tsdb_repo_t
*
repo
,
STsdbCfg
*
pCfg
)
{
int32_t
tsdbConfigRepo
(
TsdbRepoT
*
repo
,
STsdbCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
pRepo
->
config
=
*
pCfg
;
...
...
@@ -293,7 +293,7 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
return
0
;
}
int32_t
tsdbTriggerCommit
(
tsdb_repo_t
*
repo
)
{
int32_t
tsdbTriggerCommit
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
tsdbLockRepo
(
repo
);
...
...
@@ -325,12 +325,12 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
return
0
;
}
int32_t
tsdbLockRepo
(
tsdb_repo_t
*
repo
)
{
int32_t
tsdbLockRepo
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
pthread_mutex_lock
(
&
(
pRepo
->
mutex
));
}
int32_t
tsdbUnLockRepo
(
tsdb_repo_t
*
repo
)
{
int32_t
tsdbUnLockRepo
(
TsdbRepoT
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
pthread_mutex_unlock
(
&
(
pRepo
->
mutex
));
}
...
...
@@ -343,35 +343,35 @@ int32_t tsdbUnLockRepo(tsdb_repo_t *repo) {
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STsdbRepoInfo
*
tsdbGetStatus
(
tsdb_repo_t
*
pRepo
)
{
STsdbRepoInfo
*
tsdbGetStatus
(
TsdbRepoT
*
pRepo
)
{
// TODO
return
NULL
;
}
int
tsdbCreateTable
(
tsdb_repo_t
*
repo
,
STableCfg
*
pCfg
)
{
int
tsdbCreateTable
(
TsdbRepoT
*
repo
,
STableCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
tsdbCreateTableImpl
(
pRepo
->
tsdbMeta
,
pCfg
);
}
int
tsdbAlterTable
(
tsdb_repo_t
*
pRepo
,
STableCfg
*
pCfg
)
{
int
tsdbAlterTable
(
TsdbRepoT
*
pRepo
,
STableCfg
*
pCfg
)
{
// TODO
return
0
;
}
int
tsdbDropTable
(
tsdb_repo_t
*
repo
,
STableId
tableId
)
{
int
tsdbDropTable
(
TsdbRepoT
*
repo
,
STableId
tableId
)
{
if
(
repo
==
NULL
)
return
-
1
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
tsdbDropTableImpl
(
pRepo
->
tsdbMeta
,
tableId
);
}
STableInfo
*
tsdbGetTableInfo
(
tsdb_repo_t
*
pRepo
,
STableId
tableId
)
{
STableInfo
*
tsdbGetTableInfo
(
TsdbRepoT
*
pRepo
,
STableId
tableId
)
{
// TODO
return
NULL
;
}
// TODO: need to return the number of data inserted
int32_t
tsdbInsertData
(
tsdb_repo_t
*
repo
,
SSubmitMsg
*
pMsg
)
{
int32_t
tsdbInsertData
(
TsdbRepoT
*
repo
,
SSubmitMsg
*
pMsg
)
{
SSubmitMsgIter
msgIter
;
tsdbInitSubmitMsgIter
(
pMsg
,
&
msgIter
);
...
...
@@ -556,12 +556,12 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
return
pBlock
;
}
STsdbMeta
*
tsdbGetMeta
(
tsdb_repo_t
*
pRepo
)
{
STsdbMeta
*
tsdbGetMeta
(
TsdbRepoT
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbMeta
;
}
STsdbFileH
*
tsdbGetFile
(
tsdb_repo_t
*
pRepo
)
{
STsdbFileH
*
tsdbGetFile
(
TsdbRepoT
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbFileH
;
}
...
...
@@ -772,7 +772,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return
0
;
}
static
int32_t
tsdbInsertDataToTable
(
tsdb_repo_t
*
repo
,
SSubmitBlk
*
pBlock
)
{
static
int32_t
tsdbInsertDataToTable
(
TsdbRepoT
*
repo
,
SSubmitBlk
*
pBlock
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STableId
tableId
=
{.
uid
=
pBlock
->
uid
,
.
tid
=
pBlock
->
tid
};
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
313887b7
...
...
@@ -225,7 +225,7 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
}
}
int32_t
tsdbGetTableTagVal
(
tsdb_repo_t
*
repo
,
STableId
id
,
int32_t
colId
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
)
{
int32_t
tsdbGetTableTagVal
(
TsdbRepoT
*
repo
,
STableId
id
,
int32_t
colId
,
int16_t
*
type
,
int16_t
*
bytes
,
char
**
val
)
{
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
.
uid
);
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
313887b7
...
...
@@ -765,9 +765,9 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
return
0
;
}
static
FORCE_INLINE
int
compKeyFunc
(
const
void
*
arg1
,
const
void
*
arg2
)
{
return
((
*
(
TSKEY
*
)
arg1
)
-
(
*
(
TSKEY
*
)
arg2
));
}
//
static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) {
//
return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2));
//
}
// Merge the data with a block in file
static
int
tsdbMergeDataWithBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
,
SDataCols
*
pDataCols
)
{
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
313887b7
...
...
@@ -135,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo
->
fileListIndex
=
-
1
;
}
tsdb_query_handle_t
*
tsdbQueryTables
(
tsdb_repo_t
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
SArray
*
pColumnInfo
)
{
TsdbQueryHandleT
*
tsdbQueryTables
(
TsdbRepoT
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
SArray
*
pColumnInfo
)
{
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
...
...
@@ -199,7 +199,7 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
tsdbInitDataBlockLoadInfo
(
&
pQueryHandle
->
dataBlockLoadInfo
);
tsdbInitCompBlockLoadInfo
(
&
pQueryHandle
->
compBlockLoadInfo
);
return
(
tsdb_query_handle_t
)
pQueryHandle
;
return
(
TsdbQueryHandleT
)
pQueryHandle
;
}
static
bool
hasMoreDataInCache
(
STsdbQueryHandle
*
pHandle
)
{
...
...
@@ -914,7 +914,7 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
}
// handle data in cache situation
bool
tsdbNextDataBlock
(
tsdb_query_handle_t
*
pqHandle
)
{
bool
tsdbNextDataBlock
(
TsdbQueryHandleT
*
pqHandle
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
pqHandle
;
size_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
...
...
@@ -1014,7 +1014,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
}
// copy data from cache into data block
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
tsdb_query_handle_t
*
pQueryHandle
)
{
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
pQueryHandle
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
STable
*
pTable
=
NULL
;
...
...
@@ -1072,12 +1072,12 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
}
// return null for data block in cache
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdb_query_handle_t
*
pQueryHandle
,
SDataStatis
**
pBlockStatis
)
{
int32_t
tsdbRetrieveDataBlockStatisInfo
(
TsdbQueryHandleT
*
pQueryHandle
,
SDataStatis
**
pBlockStatis
)
{
*
pBlockStatis
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
SArray
*
tsdbRetrieveDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
,
SArray
*
pIdList
)
{
SArray
*
tsdbRetrieveDataBlock
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
)
{
/**
* In the following two cases, the data has been loaded to SColumnInfoData.
* 1. data is from cache, 2. data block is not completed qualified to query time range
...
...
@@ -1114,21 +1114,21 @@ SArray* tsdbRetrieveDataBlock(tsdb_query_handle_t* pQueryHandle, SArray* pIdList
}
}
int32_t
tsdbResetQuery
(
tsdb_query_handle_t
*
pQueryHandle
,
STimeWindow
*
window
,
tsdbpos_t
position
,
int16_t
order
)
{
int32_t
tsdbResetQuery
(
TsdbQueryHandleT
*
pQueryHandle
,
STimeWindow
*
window
,
TsdbPosT
position
,
int16_t
order
)
{
return
0
;
}
int32_t
tsdbDataBlockSeek
(
tsdb_query_handle_t
*
pQueryHandle
,
tsdbpos_t
pos
)
{
return
0
;
}
int32_t
tsdbDataBlockSeek
(
TsdbQueryHandleT
*
pQueryHandle
,
TsdbPosT
pos
)
{
return
0
;
}
tsdbpos_t
tsdbDataBlockTell
(
tsdb_query_handle_t
*
pQueryHandle
)
{
return
NULL
;
}
TsdbPosT
tsdbDataBlockTell
(
TsdbQueryHandleT
*
pQueryHandle
)
{
return
NULL
;
}
SArray
*
tsdbRetrieveDataRow
(
tsdb_query_handle_t
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
)
{
return
NULL
;
}
SArray
*
tsdbRetrieveDataRow
(
TsdbQueryHandleT
*
pQueryHandle
,
SArray
*
pIdList
,
SQueryRowCond
*
pCond
)
{
return
NULL
;
}
tsdb_query_handle_t
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
)
{
TsdbQueryHandleT
*
tsdbQueryFromTagConds
(
STsdbQueryCond
*
pCond
,
int16_t
stableId
,
const
char
*
pTagFilterStr
)
{
return
NULL
;
}
SArray
*
tsdbGetTableList
(
tsdb_query_handle_t
*
pQueryHandle
)
{
return
NULL
;
}
SArray
*
tsdbGetTableList
(
TsdbQueryHandleT
*
pQueryHandle
)
{
return
NULL
;
}
static
int32_t
getAllTableIdList
(
STsdbRepo
*
tsdb
,
int64_t
uid
,
SArray
*
list
)
{
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
...
...
@@ -1439,7 +1439,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbQueryTags
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupInfo
,
int32_t
tsdbQueryTags
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
STable
*
pSTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
...
...
@@ -1481,7 +1481,7 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
return
ret
;
}
int32_t
tsdbGetOneTableGroup
(
tsdb_repo_t
*
tsdb
,
int64_t
uid
,
STableGroupInfo
*
pGroupInfo
)
{
int32_t
tsdbGetOneTableGroup
(
TsdbRepoT
*
tsdb
,
int64_t
uid
,
STableGroupInfo
*
pGroupInfo
)
{
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_INVALID_TABLE_ID
;
...
...
@@ -1498,7 +1498,7 @@ int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pG
return
TSDB_CODE_SUCCESS
;
}
void
tsdbCleanupQueryHandle
(
tsdb_query_handle_t
queryHandle
)
{
void
tsdbCleanupQueryHandle
(
TsdbQueryHandleT
queryHandle
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
queryHandle
;
if
(
pQueryHandle
==
NULL
)
{
return
;
...
...
src/tsdb/tests/tsdbTests.cpp
浏览文件 @
313887b7
...
...
@@ -12,7 +12,7 @@ static double getCurTime() {
}
typedef
struct
{
tsdb_repo_t
*
pRepo
;
TsdbRepoT
*
pRepo
;
int
tid
;
int64_t
uid
;
int
sversion
;
...
...
@@ -129,7 +129,7 @@ TEST(TsdbTest, createRepo) {
tsdbSetDefaultCfg
(
&
config
);
ASSERT_EQ
(
tsdbCreateRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
&
config
,
NULL
),
0
);
tsdb_repo_t
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
TsdbRepoT
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
ASSERT_NE
(
pRepo
,
nullptr
);
// 2. Create a normal table
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录