Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
53cc3945
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
53cc3945
编写于
6月 11, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-353
上级
e6985f8e
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
68 addition
and
68 deletion
+68
-68
src/inc/tsdb.h
src/inc/tsdb.h
+23
-23
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+7
-7
src/tsdb/src/tsdbCache.c
src/tsdb/src/tsdbCache.c
+4
-4
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+22
-22
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+4
-4
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+6
-6
src/tsdb/tests/tsdbTests.cpp
src/tsdb/tests/tsdbTests.cpp
+2
-2
未找到文件。
src/inc/tsdb.h
浏览文件 @
53cc3945
...
...
@@ -72,19 +72,19 @@ typedef struct {
int64_t
pointsWritten
;
// total data points written
}
STsdbStat
;
typedef
void
T
sdbRepo
T
;
// use void to hide implementation details from outside
typedef
void
T
SDB_REPO_
T
;
// use void to hide implementation details from outside
void
tsdbSetDefaultCfg
(
STsdbCfg
*
pCfg
);
STsdbCfg
*
tsdbCreateDefaultCfg
();
void
tsdbFreeCfg
(
STsdbCfg
*
pCfg
);
STsdbCfg
*
tsdbGetCfg
(
const
T
sdbRepo
T
*
repo
);
STsdbCfg
*
tsdbGetCfg
(
const
T
SDB_REPO_
T
*
repo
);
// --------- TSDB REPOSITORY DEFINITION
int
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
);
int32_t
tsdbDropRepo
(
T
sdbRepo
T
*
repo
);
T
sdbRepo
T
*
tsdbOpenRepo
(
char
*
rootDir
,
STsdbAppH
*
pAppH
);
int32_t
tsdbCloseRepo
(
T
sdbRepo
T
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
T
sdbRepo
T
*
repo
,
STsdbCfg
*
pCfg
);
int32_t
tsdbDropRepo
(
T
SDB_REPO_
T
*
repo
);
T
SDB_REPO_
T
*
tsdbOpenRepo
(
char
*
rootDir
,
STsdbAppH
*
pAppH
);
int32_t
tsdbCloseRepo
(
T
SDB_REPO_
T
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
T
SDB_REPO_
T
*
repo
,
STsdbCfg
*
pCfg
);
// --------- TSDB TABLE DEFINITION
typedef
struct
{
...
...
@@ -116,18 +116,18 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int
tsdbTableSetStreamSql
(
STableCfg
*
config
,
char
*
sql
,
bool
dup
);
void
tsdbClearTableCfg
(
STableCfg
*
config
);
void
*
tsdbGetTableTagVal
(
T
sdbRepo
T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
);
char
*
tsdbGetTableName
(
T
sdbRepo
T
*
repo
,
const
STableId
*
id
);
void
*
tsdbGetTableTagVal
(
T
SDB_REPO_
T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
);
char
*
tsdbGetTableName
(
T
SDB_REPO_
T
*
repo
,
const
STableId
*
id
);
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
);
int
tsdbCreateTable
(
T
sdbRepo
T
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
T
sdbRepo
T
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
T
sdbRepo
T
*
repo
,
STableCfg
*
pCfg
);
int
tsdbUpdateTagValue
(
T
sdbRepo
T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
TSKEY
tsdbGetTableLastKey
(
T
sdbRepo
T
*
repo
,
uint64_t
uid
);
void
tsdbStartStream
(
T
sdbRepo
T
*
repo
);
int
tsdbCreateTable
(
T
SDB_REPO_
T
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
T
SDB_REPO_
T
*
pRepo
,
STableId
tableId
);
int
tsdbAlterTable
(
T
SDB_REPO_
T
*
repo
,
STableCfg
*
pCfg
);
int
tsdbUpdateTagValue
(
T
SDB_REPO_
T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
TSKEY
tsdbGetTableLastKey
(
T
SDB_REPO_
T
*
repo
,
uint64_t
uid
);
void
tsdbStartStream
(
T
SDB_REPO_
T
*
repo
);
uint32_t
tsdbGetFileInfo
(
T
sdbRepo
T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
);
uint32_t
tsdbGetFileInfo
(
T
SDB_REPO_
T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
);
// the TSDB repository info
typedef
struct
STsdbRepoInfo
{
...
...
@@ -137,7 +137,7 @@ typedef struct STsdbRepoInfo {
int64_t
tsdbTotalDiskSize
;
// the total disk size taken by this TSDB repository
// TODO: Other informations to add
}
STsdbRepoInfo
;
STsdbRepoInfo
*
tsdbGetStatus
(
T
sdbRepo
T
*
pRepo
);
STsdbRepoInfo
*
tsdbGetStatus
(
T
SDB_REPO_
T
*
pRepo
);
// the meter information report structure
typedef
struct
{
...
...
@@ -146,7 +146,7 @@ typedef struct {
int64_t
tableTotalDataSize
;
// In bytes
int64_t
tableTotalDiskSize
;
// In bytes
}
STableInfo
;
STableInfo
*
tsdbGetTableInfo
(
T
sdbRepo
T
*
pRepo
,
STableId
tid
);
STableInfo
*
tsdbGetTableInfo
(
T
SDB_REPO_
T
*
pRepo
,
STableId
tid
);
// -- FOR INSERT DATA
/**
...
...
@@ -156,7 +156,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t
tsdbInsertData
(
T
sdbRepo
T
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
)
;
int32_t
tsdbInsertData
(
T
SDB_REPO_
T
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
)
;
// -- FOR QUERY TIME SERIES DATA
...
...
@@ -199,7 +199,7 @@ typedef void *TsdbPosT;
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT
*
tsdbQueryTables
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
void
*
qinfo
);
TsdbQueryHandleT
*
tsdbQueryTables
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
void
*
qinfo
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
...
...
@@ -211,11 +211,11 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* @param groupInfo tableId list.
* @return
*/
TsdbQueryHandleT
tsdbQueryLastRow
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
void
*
qinfo
);
TsdbQueryHandleT
tsdbQueryLastRow
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupInfo
,
void
*
qinfo
);
SArray
*
tsdbGetQueriedTableIdList
(
TsdbQueryHandleT
*
pHandle
);
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
);
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
);
/**
* move to next block if exists
...
...
@@ -293,7 +293,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t
tsdbQuerySTableByTagCond
(
T
sdbRepo
T
*
tsdb
,
uint64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
int32_t
tsdbQuerySTableByTagCond
(
T
SDB_REPO_
T
*
tsdb
,
uint64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupList
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
);
...
...
@@ -305,7 +305,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTag
* @param pGroupInfo the generated result
* @return
*/
int32_t
tsdbGetOneTableGroup
(
T
sdbRepo
T
*
tsdb
,
uint64_t
uid
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetOneTableGroup
(
T
SDB_REPO_
T
*
tsdb
,
uint64_t
uid
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
53cc3945
...
...
@@ -142,7 +142,7 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name)
/* TODO */
STsdbMeta
*
tsdbGetMeta
(
T
sdbRepo
T
*
pRepo
);
STsdbMeta
*
tsdbGetMeta
(
T
SDB_REPO_
T
*
pRepo
);
STable
*
tsdbIsValidTableToInsert
(
STsdbMeta
*
pMeta
,
STableId
tableId
);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
...
...
@@ -177,10 +177,10 @@ typedef struct {
STsdbCacheBlock
*
curBlock
;
SCacheMem
*
mem
;
SCacheMem
*
imem
;
T
sdbRepo
T
*
pRepo
;
T
SDB_REPO_
T
*
pRepo
;
}
STsdbCache
;
STsdbCache
*
tsdbInitCache
(
int
cacheBlockSize
,
int
totalBlocks
,
T
sdbRepo
T
*
pRepo
);
STsdbCache
*
tsdbInitCache
(
int
cacheBlockSize
,
int
totalBlocks
,
T
SDB_REPO_
T
*
pRepo
);
void
tsdbFreeCache
(
STsdbCache
*
pCache
);
void
*
tsdbAllocFromCache
(
STsdbCache
*
pCache
,
int
bytes
,
TSKEY
key
);
...
...
@@ -342,7 +342,7 @@ typedef struct {
SCompCol
cols
[];
}
SCompData
;
STsdbFileH
*
tsdbGetFile
(
T
sdbRepo
T
*
pRepo
);
STsdbFileH
*
tsdbGetFile
(
T
SDB_REPO_
T
*
pRepo
);
int
tsdbCopyBlockDataInFile
(
SFile
*
pOutFile
,
SFile
*
pInFile
,
SCompInfo
*
pCompInfo
,
int
idx
,
int
isLast
,
SDataCols
*
pCols
);
...
...
@@ -404,9 +404,9 @@ typedef struct {
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
);
SSubmitBlk
*
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
);
int32_t
tsdbTriggerCommit
(
T
sdbRepo
T
*
repo
);
int32_t
tsdbLockRepo
(
T
sdbRepo
T
*
repo
);
int32_t
tsdbUnLockRepo
(
T
sdbRepo
T
*
repo
);
int32_t
tsdbTriggerCommit
(
T
SDB_REPO_
T
*
repo
);
int32_t
tsdbLockRepo
(
T
SDB_REPO_
T
*
repo
);
int32_t
tsdbUnLockRepo
(
T
SDB_REPO_
T
*
repo
);
typedef
enum
{
TSDB_WRITE_HELPER
,
TSDB_READ_HELPER
}
tsdb_rw_helper_t
;
...
...
src/tsdb/src/tsdbCache.c
浏览文件 @
53cc3945
...
...
@@ -22,7 +22,7 @@ static void tsdbFreeBlockList(SList *list);
static
void
tsdbFreeCacheMem
(
SCacheMem
*
mem
);
static
int
tsdbAddCacheBlockToPool
(
STsdbCache
*
pCache
);
STsdbCache
*
tsdbInitCache
(
int
cacheBlockSize
,
int
totalBlocks
,
T
sdbRepo
T
*
pRepo
)
{
STsdbCache
*
tsdbInitCache
(
int
cacheBlockSize
,
int
totalBlocks
,
T
SDB_REPO_
T
*
pRepo
)
{
STsdbCache
*
pCache
=
(
STsdbCache
*
)
calloc
(
1
,
sizeof
(
STsdbCache
));
if
(
pCache
==
NULL
)
return
NULL
;
...
...
@@ -143,7 +143,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
STsdbCache
*
pCache
=
pRepo
->
tsdbCache
;
int
oldNumOfBlocks
=
pCache
->
totalCacheBlocks
;
tsdbLockRepo
((
T
sdbRepo
T
*
)
pRepo
);
tsdbLockRepo
((
T
SDB_REPO_
T
*
)
pRepo
);
ASSERT
(
pCache
->
totalCacheBlocks
!=
totalBlocks
);
...
...
@@ -153,7 +153,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
pCache
->
totalCacheBlocks
=
totalBlocks
;
for
(
int
i
=
0
;
i
<
blocksToAdd
;
i
++
)
{
if
(
tsdbAddCacheBlockToPool
(
pCache
)
<
0
)
{
tsdbUnLockRepo
((
T
sdbRepo
T
*
)
pRepo
);
tsdbUnLockRepo
((
T
SDB_REPO_
T
*
)
pRepo
);
tsdbError
(
"tsdbId:%d, failed to add cache block to cache pool"
,
pRepo
->
config
.
tsdbId
);
return
-
1
;
}
...
...
@@ -164,7 +164,7 @@ int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
}
pRepo
->
config
.
totalBlocks
=
totalBlocks
;
tsdbUnLockRepo
((
T
sdbRepo
T
*
)
pRepo
);
tsdbUnLockRepo
((
T
SDB_REPO_
T
*
)
pRepo
);
tsdbTrace
(
"vgId:%d, tsdb total cache blocks changed from %d to %d"
,
pRepo
->
config
.
tsdbId
,
oldNumOfBlocks
,
totalBlocks
);
return
0
;
}
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
53cc3945
...
...
@@ -26,7 +26,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static
int32_t
tsdbSetRepoEnv
(
STsdbRepo
*
pRepo
);
static
int32_t
tsdbDestroyRepoEnv
(
STsdbRepo
*
pRepo
);
// static int tsdbOpenMetaFile(char *tsdbDir);
static
int32_t
tsdbInsertDataToTable
(
T
sdbRepo
T
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
,
int
*
affectedrows
);
static
int32_t
tsdbInsertDataToTable
(
T
SDB_REPO_
T
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
,
int
*
affectedrows
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
...
...
@@ -78,7 +78,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
if
(
pCfg
!=
NULL
)
free
(
pCfg
);
}
STsdbCfg
*
tsdbGetCfg
(
const
T
sdbRepo
T
*
repo
)
{
STsdbCfg
*
tsdbGetCfg
(
const
T
SDB_REPO_
T
*
repo
)
{
assert
(
repo
!=
NULL
);
return
&
((
STsdbRepo
*
)
repo
)
->
config
;
}
...
...
@@ -134,7 +134,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */)
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbDropRepo
(
T
sdbRepo
T
*
repo
)
{
int32_t
tsdbDropRepo
(
T
SDB_REPO_
T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
int
id
=
pRepo
->
config
.
tsdbId
;
...
...
@@ -192,7 +192,7 @@ _err:
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
T
sdbRepo
T
*
tsdbOpenRepo
(
char
*
rootDir
,
STsdbAppH
*
pAppH
)
{
T
SDB_REPO_
T
*
tsdbOpenRepo
(
char
*
rootDir
,
STsdbAppH
*
pAppH
)
{
char
dataDir
[
128
]
=
"
\0
"
;
if
(
access
(
rootDir
,
F_OK
|
W_OK
|
R_OK
)
<
0
)
{
return
NULL
;
...
...
@@ -215,7 +215,7 @@ TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
return
NULL
;
}
pRepo
->
tsdbCache
=
tsdbInitCache
(
pRepo
->
config
.
cacheBlockSize
,
pRepo
->
config
.
totalBlocks
,
(
T
sdbRepo
T
*
)
pRepo
);
pRepo
->
tsdbCache
=
tsdbInitCache
(
pRepo
->
config
.
cacheBlockSize
,
pRepo
->
config
.
totalBlocks
,
(
T
SDB_REPO_
T
*
)
pRepo
);
if
(
pRepo
->
tsdbCache
==
NULL
)
{
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
free
(
pRepo
->
rootDir
);
...
...
@@ -246,7 +246,7 @@ TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
pRepo
->
state
=
TSDB_REPO_STATE_ACTIVE
;
tsdbTrace
(
"vgId:%d, open tsdb repository successfully!"
,
pRepo
->
config
.
tsdbId
);
return
(
T
sdbRepo
T
*
)
pRepo
;
return
(
T
SDB_REPO_
T
*
)
pRepo
;
}
// static int32_t tsdbFlushCache(STsdbRepo *pRepo) {
...
...
@@ -261,7 +261,7 @@ TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbCloseRepo
(
T
sdbRepo
T
*
repo
,
int
toCommit
)
{
int32_t
tsdbCloseRepo
(
T
SDB_REPO_
T
*
repo
,
int
toCommit
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
==
NULL
)
return
0
;
int
id
=
pRepo
->
config
.
tsdbId
;
...
...
@@ -310,7 +310,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit) {
*
* @return 0 for success, -1 for failure and the error number is set
*/
int32_t
tsdbConfigRepo
(
T
sdbRepo
T
*
repo
,
STsdbCfg
*
pCfg
)
{
int32_t
tsdbConfigRepo
(
T
SDB_REPO_
T
*
repo
,
STsdbCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbCfg
*
pRCfg
=
&
pRepo
->
config
;
...
...
@@ -346,7 +346,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbTriggerCommit
(
T
sdbRepo
T
*
repo
)
{
int32_t
tsdbTriggerCommit
(
T
SDB_REPO_
T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
...
...
@@ -381,12 +381,12 @@ int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
return
0
;
}
int32_t
tsdbLockRepo
(
T
sdbRepo
T
*
repo
)
{
int32_t
tsdbLockRepo
(
T
SDB_REPO_
T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
pthread_mutex_lock
(
&
(
pRepo
->
mutex
));
}
int32_t
tsdbUnLockRepo
(
T
sdbRepo
T
*
repo
)
{
int32_t
tsdbUnLockRepo
(
T
SDB_REPO_
T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
return
pthread_mutex_unlock
(
&
(
pRepo
->
mutex
));
}
...
...
@@ -399,17 +399,17 @@ int32_t tsdbUnLockRepo(TsdbRepoT *repo) {
* @return a info struct handle on success, NULL for failure and the error number is set. The upper
* layers should free the info handle themselves or memory leak will occur
*/
STsdbRepoInfo
*
tsdbGetStatus
(
T
sdbRepo
T
*
pRepo
)
{
STsdbRepoInfo
*
tsdbGetStatus
(
T
SDB_REPO_
T
*
pRepo
)
{
// TODO
return
NULL
;
}
int
tsdbAlterTable
(
T
sdbRepo
T
*
pRepo
,
STableCfg
*
pCfg
)
{
int
tsdbAlterTable
(
T
SDB_REPO_
T
*
pRepo
,
STableCfg
*
pCfg
)
{
// TODO
return
0
;
}
int
tsdbUpdateTagValue
(
T
sdbRepo
T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
)
{
int
tsdbUpdateTagValue
(
T
SDB_REPO_
T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int16_t
tversion
=
htons
(
pMsg
->
tversion
);
...
...
@@ -464,7 +464,7 @@ int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
TSKEY
tsdbGetTableLastKey
(
T
sdbRepo
T
*
repo
,
uint64_t
uid
)
{
TSKEY
tsdbGetTableLastKey
(
T
SDB_REPO_
T
*
repo
,
uint64_t
uid
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STable
*
pTable
=
tsdbGetTableByUid
(
pRepo
->
tsdbMeta
,
uid
);
...
...
@@ -473,7 +473,7 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid) {
return
TSDB_GET_TABLE_LAST_KEY
(
pTable
);
}
void
tsdbStartStream
(
T
sdbRepo
T
*
repo
)
{
void
tsdbStartStream
(
T
SDB_REPO_
T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
...
...
@@ -485,13 +485,13 @@ void tsdbStartStream(TsdbRepoT *repo) {
}
}
STableInfo
*
tsdbGetTableInfo
(
T
sdbRepo
T
*
pRepo
,
STableId
tableId
)
{
STableInfo
*
tsdbGetTableInfo
(
T
SDB_REPO_
T
*
pRepo
,
STableId
tableId
)
{
// TODO
return
NULL
;
}
// TODO: need to return the number of data inserted
int32_t
tsdbInsertData
(
T
sdbRepo
T
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
)
{
int32_t
tsdbInsertData
(
T
SDB_REPO_
T
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
)
{
SSubmitMsgIter
msgIter
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
...
...
@@ -698,12 +698,12 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
return
pBlock
;
}
STsdbMeta
*
tsdbGetMeta
(
T
sdbRepo
T
*
pRepo
)
{
STsdbMeta
*
tsdbGetMeta
(
T
SDB_REPO_
T
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbMeta
;
}
STsdbFileH
*
tsdbGetFile
(
T
sdbRepo
T
*
pRepo
)
{
STsdbFileH
*
tsdbGetFile
(
T
SDB_REPO_
T
*
pRepo
)
{
STsdbRepo
*
tsdb
=
(
STsdbRepo
*
)
pRepo
;
return
tsdb
->
tsdbFileH
;
}
...
...
@@ -950,7 +950,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return
0
;
}
static
int32_t
tsdbInsertDataToTable
(
T
sdbRepo
T
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
,
int32_t
*
affectedrows
)
{
static
int32_t
tsdbInsertDataToTable
(
T
SDB_REPO_
T
*
repo
,
SSubmitBlk
*
pBlock
,
TSKEY
now
,
int32_t
*
affectedrows
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int64_t
points
=
0
;
...
...
@@ -1318,7 +1318,7 @@ static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
}
#define TSDB_META_FILE_INDEX 10000000
uint32_t
tsdbGetFileInfo
(
T
sdbRepo
T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
)
{
uint32_t
tsdbGetFileInfo
(
T
SDB_REPO_
T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
53cc3945
...
...
@@ -280,7 +280,7 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
}
}
void
*
tsdbGetTableTagVal
(
T
sdbRepo
T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
)
{
void
*
tsdbGetTableTagVal
(
T
SDB_REPO_
T
*
repo
,
const
STableId
*
id
,
int32_t
colId
,
int16_t
type
,
int16_t
bytes
)
{
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
);
...
...
@@ -300,7 +300,7 @@ void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int
return
val
;
}
char
*
tsdbGetTableName
(
T
sdbRepo
T
*
repo
,
const
STableId
*
id
)
{
char
*
tsdbGetTableName
(
T
SDB_REPO_
T
*
repo
,
const
STableId
*
id
)
{
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
repo
);
STable
*
pTable
=
tsdbGetTableByUid
(
pMeta
,
id
->
uid
);
...
...
@@ -438,7 +438,7 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
return
TSDB_CODE_SUCCESS
;
}
int
tsdbCreateTable
(
T
sdbRepo
T
*
repo
,
STableCfg
*
pCfg
)
{
int
tsdbCreateTable
(
T
SDB_REPO_
T
*
repo
,
STableCfg
*
pCfg
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
...
...
@@ -581,7 +581,7 @@ _err:
}
// int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
int
tsdbDropTable
(
T
sdbRepo
T
*
repo
,
STableId
tableId
)
{
int
tsdbDropTable
(
T
SDB_REPO_
T
*
repo
,
STableId
tableId
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
if
(
pRepo
==
NULL
)
return
-
1
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
53cc3945
...
...
@@ -135,7 +135,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo
->
fileId
=
-
1
;
}
TsdbQueryHandleT
*
tsdbQueryTables
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
TsdbQueryHandleT
*
tsdbQueryTables
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
...
...
@@ -203,7 +203,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
return
(
TsdbQueryHandleT
)
pQueryHandle
;
}
TsdbQueryHandleT
tsdbQueryLastRow
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
TsdbQueryHandleT
tsdbQueryLastRow
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
tsdbQueryTables
(
tsdb
,
pCond
,
groupList
,
qinfo
);
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_LAST
;
...
...
@@ -229,7 +229,7 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
return
res
;
}
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
T
sdbRepo
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
T
SDB_REPO_
T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
tsdbQueryTables
(
tsdb
,
pCond
,
groupList
,
qinfo
);
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_EXTERNAL
;
...
...
@@ -1990,7 +1990,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab
}
SArray
*
createTableGroup
(
SArray
*
pTableList
,
STSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
,
T
sdbRepo
T
*
tsdb
)
{
T
SDB_REPO_
T
*
tsdb
)
{
assert
(
pTableList
!=
NULL
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
...
...
@@ -2102,7 +2102,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbQuerySTableByTagCond
(
T
sdbRepo
T
*
tsdb
,
uint64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
int32_t
tsdbQuerySTableByTagCond
(
T
SDB_REPO_
T
*
tsdb
,
uint64_t
uid
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
)
{
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
...
...
@@ -2176,7 +2176,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
return
ret
;
}
int32_t
tsdbGetOneTableGroup
(
T
sdbRepo
T
*
tsdb
,
uint64_t
uid
,
STableGroupInfo
*
pGroupInfo
)
{
int32_t
tsdbGetOneTableGroup
(
T
SDB_REPO_
T
*
tsdb
,
uint64_t
uid
,
STableGroupInfo
*
pGroupInfo
)
{
STable
*
pTable
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
uid
);
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_TDB_INVALID_TABLE_ID
;
...
...
src/tsdb/tests/tsdbTests.cpp
浏览文件 @
53cc3945
...
...
@@ -13,7 +13,7 @@ static double getCurTime() {
}
typedef
struct
{
T
sdbRepo
T
*
pRepo
;
T
SDB_REPO_
T
*
pRepo
;
bool
isAscend
;
int
tid
;
uint64_t
uid
;
...
...
@@ -136,7 +136,7 @@ TEST(TsdbTest, createRepo) {
tsdbSetDefaultCfg
(
&
config
);
ASSERT_EQ
(
tsdbCreateRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
&
config
,
NULL
),
0
);
T
sdbRepo
T
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
T
SDB_REPO_
T
*
pRepo
=
tsdbOpenRepo
(
"/home/ubuntu/work/ttest/vnode0"
,
NULL
);
ASSERT_NE
(
pRepo
,
nullptr
);
// 2. Create a normal table
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录