Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4a03b48f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4a03b48f
编写于
4月 08, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
4月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11312 from taosdata/feature/vnode
Feature/vnode
上级
fdbab777
742b2918
变更
61
展开全部
隐藏空白更改
内联
并排
Showing
61 changed file
with
1506 addition
and
1699 deletion
+1506
-1699
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+56
-34
source/dnode/vnode/inc/meta.h
source/dnode/vnode/inc/meta.h
+0
-86
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+0
-288
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+318
-2
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+3
-7
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+87
-8
source/dnode/vnode/src/inc/tqCommit.h
source/dnode/vnode/src/inc/tqCommit.h
+0
-14
source/dnode/vnode/src/inc/tqMetaStore.h
source/dnode/vnode/src/inc/tqMetaStore.h
+0
-51
source/dnode/vnode/src/inc/tqOffset.h
source/dnode/vnode/src/inc/tqOffset.h
+0
-40
source/dnode/vnode/src/inc/tqPush.h
source/dnode/vnode/src/inc/tqPush.h
+0
-80
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+936
-0
source/dnode/vnode/src/inc/tsdbCommit.h
source/dnode/vnode/src/inc/tsdbCommit.h
+0
-77
source/dnode/vnode/src/inc/tsdbCompact.h
source/dnode/vnode/src/inc/tsdbCompact.h
+0
-32
source/dnode/vnode/src/inc/tsdbDBDef.h
source/dnode/vnode/src/inc/tsdbDBDef.h
+0
-45
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+0
-84
source/dnode/vnode/src/inc/tsdbFS.h
source/dnode/vnode/src/inc/tsdbFS.h
+0
-141
source/dnode/vnode/src/inc/tsdbLog.h
source/dnode/vnode/src/inc/tsdbLog.h
+0
-38
source/dnode/vnode/src/inc/tsdbMemTable.h
source/dnode/vnode/src/inc/tsdbMemTable.h
+0
-81
source/dnode/vnode/src/inc/tsdbMemory.h
source/dnode/vnode/src/inc/tsdbMemory.h
+0
-81
source/dnode/vnode/src/inc/tsdbOptions.h
source/dnode/vnode/src/inc/tsdbOptions.h
+0
-32
source/dnode/vnode/src/inc/tsdbReadImpl.h
source/dnode/vnode/src/inc/tsdbReadImpl.h
+0
-256
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+0
-89
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+21
-6
source/dnode/vnode/src/inc/vnodeQuery.h
source/dnode/vnode/src/inc/vnodeQuery.h
+0
-35
source/dnode/vnode/src/meta/metaCache.c
source/dnode/vnode/src/meta/metaCache.c
+1
-2
source/dnode/vnode/src/meta/metaCfg.c
source/dnode/vnode/src/meta/metaCfg.c
+1
-1
source/dnode/vnode/src/meta/metaIdx.c
source/dnode/vnode/src/meta/metaIdx.c
+1
-1
source/dnode/vnode/src/meta/metaMain.c
source/dnode/vnode/src/meta/metaMain.c
+1
-1
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+1
-1
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+1
-1
source/dnode/vnode/src/meta/metaTbCfg.c
source/dnode/vnode/src/meta/metaTbCfg.c
+1
-1
source/dnode/vnode/src/meta/metaTbUid.c
source/dnode/vnode/src/meta/metaTbUid.c
+1
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+23
-17
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+1
-1
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+1
-2
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
+0
-2
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+4
-4
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbOptions.c
source/dnode/vnode/src/tsdb/tsdbOptions.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-9
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+1
-1
source/dnode/vnode/src/vnd/vnodeArenaMAImpl.c
source/dnode/vnode/src/vnd/vnodeArenaMAImpl.c
+1
-1
source/dnode/vnode/src/vnd/vnodeBufferPool.c
source/dnode/vnode/src/vnd/vnodeBufferPool.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+1
-1
source/dnode/vnode/src/vnd/vnodeInt.c
source/dnode/vnode/src/vnd/vnodeInt.c
+1
-1
source/dnode/vnode/src/vnd/vnodeMain.c
source/dnode/vnode/src/vnd/vnodeMain.c
+1
-1
source/dnode/vnode/src/vnd/vnodeMgr.c
source/dnode/vnode/src/vnd/vnodeMgr.c
+1
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+1
-2
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+13
-13
source/dnode/vnode/test/CMakeLists.txt
source/dnode/vnode/test/CMakeLists.txt
+11
-11
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+2
-2
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
4a03b48f
set
(
META_DB_IMPL_LIST
"BDB"
"TDB"
)
set
(
META_DB_IMPL
"TDB"
CACHE STRING
"Use BDB as the default META implementation"
)
set_property
(
CACHE META_DB_IMPL PROPERTY STRINGS
${
META_DB_IMPL_LIST
}
)
if
(
META_DB_IMPL IN_LIST META_DB_IMPL_LIST
)
message
(
STATUS
"META DB Impl:
${
META_DB_IMPL
}
=============="
)
else
()
message
(
FATAL_ERROR
"Invalid META DB IMPL:
${
META_DB_IMPL
}
=============="
)
endif
()
# vnode
add_library
(
vnode STATIC
""
)
target_sources
(
vnode
PRIVATE
# vnode
"src/vnd/vnodeArenaMAImpl.c"
"src/vnd/vnodeBufferPool.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c"
"src/vnd/vnodeMain.c"
"src/vnd/vnodeMgr.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c"
aux_source_directory
(
src/meta META_SRC
)
if
(
${
META_DB_IMPL
}
STREQUAL
"BDB"
)
list
(
REMOVE_ITEM META_SRC
"src/meta/metaTDBImpl.c"
)
elseif
(
${
META_DB_IMPL
}
STREQUAL
"TDB"
)
list
(
REMOVE_ITEM META_SRC
"src/meta/metaBDBImpl.c"
)
endif
()
# meta
# "src/meta/metaBDBImpl.c"
"src/meta/metaCache.c"
"src/meta/metaCfg.c"
"src/meta/metaIdx.c"
"src/meta/metaMain.c"
"src/meta/metaQuery.c"
"src/meta/metaTable.c"
"src/meta/metaTbCfg.c"
"src/meta/metaTbTag.c"
"src/meta/metaTbUid.c"
"src/meta/metaTDBImpl.c"
aux_source_directory
(
src/tq TQ_SRC
)
aux_source_directory
(
src/tsdb TSDB_SRC
)
aux_source_directory
(
src/vnd VND_SRC
)
# tsdb
# "src/tsdb/tsdbBDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbMain.c"
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbOptions.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbScan.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
list
(
APPEND
VNODE_SRC
${
META_SRC
}
${
TQ_SRC
}
${
TSDB_SRC
}
${
VND_SRC
}
# tq
"src/tq/tq.c"
"src/tq/tqCommit.c"
"src/tq/tqMetaStore.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqRead.c"
)
add_library
(
vnode STATIC
${
VNODE_SRC
}
)
target_include_directories
(
vnode
PUBLIC
inc
PRIVATE
src/inc
PUBLIC
"inc"
PRIVATE
"src/inc"
)
target_link_libraries
(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC transport
PUBLIC tfs
PUBLIC wal
PUBLIC scheduler
PUBLIC executor
PUBLIC stream
PUBLIC qworker
PUBLIC sync
# TODO: get rid of BDB
PUBLIC
bdb
PUBLIC executor
PUBLIC
scheduler
PUBLIC tdb
PUBLIC bdb
PUBLIC transport
PUBLIC stream
)
if
(
${
BUILD_TEST
}
)
...
...
source/dnode/vnode/inc/meta.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_H_
#define _TD_META_H_
#include "tmallocator.h"
#include "tmsg.h"
#include "trow.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
// Types exported
typedef
struct
SMeta
SMeta
;
typedef
struct
SMetaCfg
{
/// LRU cache size
uint64_t
lruSize
;
}
SMetaCfg
;
typedef
struct
SMTbCursor
SMTbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMSmaCursor
SMSmaCursor
;
typedef
SVCreateTbReq
STbCfg
;
typedef
SVCreateTSmaReq
SSmaCfg
;
// SMeta operations
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
// For Query
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCurosr
(
SMSmaCursor
*
pSmaCur
);
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
// Options
void
metaOptionsInit
(
SMetaCfg
*
pMetaCfg
);
void
metaOptionsClear
(
SMetaCfg
*
pMetaCfg
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_META_H_*/
source/dnode/vnode/inc/tsdb.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include "tmallocator.h"
#include "meta.h"
#include "tcommon.h"
#include "tfs.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SDataStatis
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SDataStatis
;
typedef
struct
STable
{
uint64_t
tid
;
uint64_t
uid
;
STSchema
*
pSchema
;
}
STable
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
// TYPES EXPOSED
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
{
int8_t
precision
;
int8_t
update
;
int8_t
compression
;
int32_t
daysPerFile
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
keep
;
int32_t
keep1
;
int32_t
keep2
;
uint64_t
lruCacheSize
;
SArray
*
retentions
;
}
STsdbCfg
;
// query condition to build multi-table data block iterator
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
STsdbQueryCond
;
typedef
struct
{
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
/**
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param indexUid
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
/**
* @brief Drop tSma data and local cache.
*
* @param pTsdb
* @param indexUid
* @return int32_t
*/
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
/**
* @brief Insert RSma(Rollup SMA) data.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
/**
* @brief Get tSma(Time-range-wise SMA) data.
*
* @param pTsdb
* @param pData
* @param indexUid
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
typedef
void
*
tsdbReaderT
;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
/**
*
* @param tsdb
* @param uid
* @param skey
* @param pTagCond
* @param len
* @param tagNameRelType
* @param tbnameCond
* @param pGroupInfo
* @param pColIndex
* @param numOfCols
* @param reqId
* @return
*/
int32_t
tsdbQuerySTableByTagCond
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
skey
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
,
uint64_t
taskId
);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool
tsdbNextDataBlock
(
tsdbReaderT
pTsdbReadHandle
);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void
tsdbRetrieveDataBlockInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataStatis
**
pBlockStatis
);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
tsdbReaderT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
* @param queryHandle
*/
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_H_*/
source/dnode/vnode/inc/vnode.h
浏览文件 @
4a03b48f
...
...
@@ -21,12 +21,17 @@
#include "tqueue.h"
#include "trpc.h"
#include "meta.h"
#include "tarray.h"
#include "tfs.h"
#include "tsdb.h"
#include "wal.h"
#include "tmallocator.h"
#include "tmsg.h"
#include "trow.h"
#include "tmallocator.h"
#include "tcommon.h"
#include "tfs.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -34,6 +39,82 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef
struct
SMgmtWrapper
SMgmtWrapper
;
typedef
struct
SVnode
SVnode
;
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
// Types exported
typedef
struct
SMeta
SMeta
;
typedef
struct
SMetaCfg
{
/// LRU cache size
uint64_t
lruSize
;
}
SMetaCfg
;
typedef
struct
SMTbCursor
SMTbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMSmaCursor
SMSmaCursor
;
typedef
SVCreateTbReq
STbCfg
;
typedef
SVCreateTSmaReq
SSmaCfg
;
typedef
struct
SDataStatis
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SDataStatis
;
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
STsdbQueryCond
;
typedef
struct
{
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
typedef
struct
STable
{
uint64_t
tid
;
uint64_t
uid
;
STSchema
*
pSchema
;
}
STable
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
// TYPES EXPOSED
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
{
int8_t
precision
;
int8_t
update
;
int8_t
compression
;
int32_t
daysPerFile
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
keep
;
int32_t
keep1
;
int32_t
keep2
;
uint64_t
lruCacheSize
;
SArray
*
retentions
;
}
STsdbCfg
;
typedef
struct
{
// TODO
int32_t
reserved
;
...
...
@@ -263,6 +344,241 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn
// return SArray<SColumnInfoData>
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
// meta.h
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
STSma
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCurosr
(
SMSmaCursor
*
pSmaCur
);
const
char
*
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
// Options
void
metaOptionsInit
(
SMetaCfg
*
pMetaCfg
);
void
metaOptionsClear
(
SMetaCfg
*
pMetaCfg
);
// query condition to build multi-table data block iterator
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
/**
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param indexUid
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
/**
* @brief Drop tSma data and local cache.
*
* @param pTsdb
* @param indexUid
* @return int32_t
*/
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
/**
* @brief Insert RSma(Rollup SMA) data.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
/**
* @brief Get tSma(Time-range-wise SMA) data.
*
* @param pTsdb
* @param pData
* @param indexUid
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t
tsdbGetTSmaData
(
STsdb
*
pTsdb
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
typedef
void
*
tsdbReaderT
;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
/**
*
* @param tsdb
* @param uid
* @param skey
* @param pTagCond
* @param len
* @param tagNameRelType
* @param tbnameCond
* @param pGroupInfo
* @param pColIndex
* @param numOfCols
* @param reqId
* @return
*/
int32_t
tsdbQuerySTableByTagCond
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
skey
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
,
uint64_t
taskId
);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool
tsdbNextDataBlock
(
tsdbReaderT
pTsdbReadHandle
);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void
tsdbRetrieveDataBlockInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataStatis
**
pBlockStatis
);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
tsdbReaderT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
* @param queryHandle
*/
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/meta
Def
.h
→
source/dnode/vnode/src/inc/meta.h
浏览文件 @
4a03b48f
...
...
@@ -13,12 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_META_DEF_H_
#define _TD_META_DEF_H_
#include "tmallocator.h"
#include "meta.h"
#ifndef _TD_VNODE_META_H_
#define _TD_VNODE_META_H_
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -78,4 +74,4 @@ struct SMeta {
}
#endif
#endif
/*_TD_
META_DEF
_H_*/
#endif
/*_TD_
VNODE_META
_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/tq
Int
.h
→
source/dnode/vnode/src/inc/tq.h
浏览文件 @
4a03b48f
...
...
@@ -13,18 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TQ_INT_H_
#define _TD_TQ_INT_H_
#include "meta.h"
#include "tlog.h"
#include "tqPush.h"
#include "vnd.h"
#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_
#ifdef __cplusplus
extern
"C"
{
#endif
// tqInt.h
#define tqFatal(...) \
{ \
if (tqDebugFlag & DEBUG_FATAL) { \
...
...
@@ -106,6 +102,9 @@ static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
typedef
enum
{
TQ_ITEM_READY
,
TQ_ITEM_PROCESS
,
TQ_ITEM_EMPTY
}
STqItemStatus
;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
typedef
struct
{
int16_t
ver
;
int16_t
action
;
...
...
@@ -211,13 +210,93 @@ typedef struct {
SArray
*
topics
;
// SArray<STqTopic>
}
STqConsumer
;
enum
{
TQ_PUSHER_TYPE__CLIENT
=
1
,
TQ_PUSHER_TYPE__STREAM
,
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
3
];
int32_t
ttl
;
int64_t
consumerId
;
SRpcMsg
*
pMsg
;
// SMqPollRsp* rsp;
}
STqClientPusher
;
typedef
struct
{
int8_t
type
;
int8_t
nodeType
;
int8_t
reserved
[
6
];
int64_t
streamId
;
qTaskInfo_t
task
;
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
int8_t
type
;
// mq or stream
}
STqPusher
;
typedef
struct
{
SHashObj
*
pHash
;
// <id, STqPush*>
}
STqPushMgr
;
typedef
struct
{
int8_t
inited
;
tmr_h
timer
;
}
STqPushMgmt
;
static
STqPushMgmt
tqPushMgmt
;
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
static
int
FORCE_INLINE
tqQueryExecuting
(
int32_t
status
)
{
return
status
;
}
// tqMetaStore.h
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
pSerializer
,
FTqDeserialize
pDeserializer
,
FTqDelete
pDeleter
,
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
STqMetaStore
*
);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
STqMetaStore
*
);
// clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
STqMetaStore
*
);
void
*
tqHandleGet
(
STqMetaStore
*
,
int64_t
key
);
// make it unpersist
void
*
tqHandleTouchGet
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePurge
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
STqMetaStore
*
,
int64_t
key
);
// tqOffset
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
int64_t
tqOffsetFetch
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetCommit
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
,
int64_t
offset
);
int32_t
tqOffsetPersist
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetPersistAll
(
STqOffsetStore
*
pStore
);
// tqPush
int32_t
tqPushMgrInit
();
void
tqPushMgrCleanUp
();
STqPushMgr
*
tqPushMgrOpen
();
void
tqPushMgrClose
(
STqPushMgr
*
pushMgr
);
STqClientPusher
*
tqAddClientPusher
(
STqPushMgr
*
pushMgr
,
SRpcMsg
*
pMsg
,
int64_t
consumerId
,
int64_t
ttl
);
STqStreamPusher
*
tqAddStreamPusher
(
STqPushMgr
*
pushMgr
,
int64_t
streamId
,
SEpSet
*
pEpSet
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_
TQ_INT
_H_*/
#endif
/*_TD_
VNODE_TQ
_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/tqCommit.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
source/dnode/vnode/src/inc/tqMetaStore.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TQ_META_STORE_H_
#define _TQ_META_STORE_H_
#include "os.h"
#include "tqInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
STqMetaStore
*
tqStoreOpen
(
STQ
*
pTq
,
const
char
*
path
,
FTqSerialize
pSerializer
,
FTqDeserialize
pDeserializer
,
FTqDelete
pDeleter
,
int32_t
tqConfigFlag
);
int32_t
tqStoreClose
(
STqMetaStore
*
);
// int32_t tqStoreDelete(TqMetaStore*);
// int32_t tqStoreCommitAll(TqMetaStore*);
int32_t
tqStorePersist
(
STqMetaStore
*
);
// clean deleted idx and data from persistent file
int32_t
tqStoreCompact
(
STqMetaStore
*
);
void
*
tqHandleGet
(
STqMetaStore
*
,
int64_t
key
);
// make it unpersist
void
*
tqHandleTouchGet
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleMovePut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
int32_t
tqHandleCopyPut
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
,
size_t
vsize
);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePurge
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
STqMetaStore
*
,
int64_t
key
);
#ifdef __cplusplus
}
#endif
#endif
/* ifndef _TQ_META_STORE_H_ */
source/dnode/vnode/src/inc/tqOffset.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TQ_OFFSET_H_
#define _TD_TQ_OFFSET_H_
#include "tqInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
int64_t
tqOffsetFetch
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetCommit
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
,
int64_t
offset
);
int32_t
tqOffsetPersist
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
);
int32_t
tqOffsetPersistAll
(
STqOffsetStore
*
pStore
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TQ_OFFSET_H_*/
source/dnode/vnode/src/inc/tqPush.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TQ_PUSH_H_
#define _TQ_PUSH_H_
#include "executor.h"
#include "thash.h"
#include "trpc.h"
#include "ttimer.h"
#include "vnode.h"
#ifdef __cplusplus
extern
"C"
{
#endif
enum
{
TQ_PUSHER_TYPE__CLIENT
=
1
,
TQ_PUSHER_TYPE__STREAM
,
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
3
];
int32_t
ttl
;
int64_t
consumerId
;
SRpcMsg
*
pMsg
;
// SMqPollRsp* rsp;
}
STqClientPusher
;
typedef
struct
{
int8_t
type
;
int8_t
nodeType
;
int8_t
reserved
[
6
];
int64_t
streamId
;
qTaskInfo_t
task
;
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
int8_t
type
;
// mq or stream
}
STqPusher
;
typedef
struct
{
SHashObj
*
pHash
;
// <id, STqPush*>
}
STqPushMgr
;
typedef
struct
{
int8_t
inited
;
tmr_h
timer
;
}
STqPushMgmt
;
static
STqPushMgmt
tqPushMgmt
;
int32_t
tqPushMgrInit
();
void
tqPushMgrCleanUp
();
STqPushMgr
*
tqPushMgrOpen
();
void
tqPushMgrClose
(
STqPushMgr
*
pushMgr
);
STqClientPusher
*
tqAddClientPusher
(
STqPushMgr
*
pushMgr
,
SRpcMsg
*
pMsg
,
int64_t
consumerId
,
int64_t
ttl
);
STqStreamPusher
*
tqAddStreamPusher
(
STqPushMgr
*
pushMgr
,
int64_t
streamId
,
SEpSet
*
pEpSet
);
#ifdef __cplusplus
}
#endif
#endif
/*_TQ_PUSH_H_*/
source/dnode/vnode/src/inc/tsdb
File
.h
→
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
4a03b48f
此差异已折叠。
点击以展开。
source/dnode/vnode/src/inc/tsdbCommit.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
{
int
minFid
;
int
midFid
;
int
maxFid
;
TSKEY
minKey
;
}
SRtn
;
typedef
struct
{
uint64_t
uid
;
int64_t
offset
;
int64_t
size
;
}
SKVRecord
;
void
tsdbGetRtnSnap
(
STsdb
*
pRepo
,
SRtn
*
pRtn
);
static
FORCE_INLINE
int
TSDB_KEY_FID
(
TSKEY
key
,
int32_t
days
,
int8_t
precision
)
{
if
(
key
<
0
)
{
return
(
int
)((
key
+
1
)
/
tsTickPerDay
[
precision
]
/
days
-
1
);
}
else
{
return
(
int
)((
key
/
tsTickPerDay
[
precision
]
/
days
));
}
}
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
if
(
fid
>=
pRtn
->
maxFid
)
{
return
0
;
}
else
if
(
fid
>=
pRtn
->
midFid
)
{
return
1
;
}
else
if
(
fid
>=
pRtn
->
minFid
)
{
return
2
;
}
else
{
return
-
1
;
}
}
#if 0
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
int tsdbApplyRtn(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus
}
#endif
#endif
/* _TD_TSDB_COMMIT_H_ */
source/dnode/vnode/src/inc/tsdbCompact.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
#ifdef __cplusplus
extern "C" {
#endif
void *tsdbCompactImpl(STsdbRepo *pRepo);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_COMPACT_H_ */
#endif
\ No newline at end of file
source/dnode/vnode/src/inc/tsdbDBDef.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_DB_DEF_H_
#define _TD_TSDB_DB_DEF_H_
#include "db.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SDBFile
SDBFile
;
typedef
DB_ENV
*
TDBEnv
;
struct
SDBFile
{
int32_t
fid
;
DB
*
pDB
;
char
*
path
;
};
int32_t
tsdbOpenDBF
(
TDBEnv
pEnv
,
SDBFile
*
pDBF
);
void
tsdbCloseDBF
(
SDBFile
*
pDBF
);
int32_t
tsdbOpenBDBEnv
(
DB_ENV
**
ppEnv
,
const
char
*
path
);
void
tsdbCloseBDBEnv
(
DB_ENV
*
pEnv
);
int32_t
tsdbSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
void
*
data
,
uint32_t
dataSize
);
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
uint32_t
*
valueSize
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_DB_DEF_H_*/
source/dnode/vnode/src/inc/tsdbDef.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_DEF_H_
#define _TD_TSDB_DEF_H_
#include "tsdbDBDef.h"
#include "tmallocator.h"
#include "meta.h"
#include "tcompression.h"
#include "tglobal.h"
#include "thash.h"
#include "tlist.h"
#include "tmsg.h"
#include "tskiplist.h"
#include "ttime.h"
#include "tsdb.h"
#include "tsdbCommit.h"
#include "tsdbFS.h"
#include "tsdbFile.h"
#include "tsdbLog.h"
#include "tsdbMemTable.h"
#include "tsdbMemory.h"
#include "tsdbOptions.h"
#include "tsdbReadImpl.h"
#include "tsdbSma.h"
#ifdef __cplusplus
extern
"C"
{
#endif
struct
STsdb
{
int32_t
vgId
;
bool
repoLocked
;
TdThreadMutex
mutex
;
char
*
path
;
STsdbCfg
config
;
STsdbMemTable
*
mem
;
STsdbMemTable
*
imem
;
SRtn
rtn
;
SMemAllocatorFactory
*
pmaf
;
STsdbFS
*
fs
;
SMeta
*
pMeta
;
STfs
*
pTfs
;
SSmaEnvs
smaEnvs
;
};
#define REPO_ID(r) ((r)->vgId)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pMeta)
#define REPO_TFS(r) ((r)->pTfs)
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
int
tsdbLockRepo
(
STsdb
*
pTsdb
);
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
);
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int32_t
version
)
{
return
pTable
->
pSchema
;
}
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_DEF_H_*/
source/dnode/vnode/src/inc/tsdbFS.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_
#include "tsdbFile.h"
#ifdef __cplusplus
extern
"C"
{
#endif
// ================== TSDB global config
extern
bool
tsdbForceKeepFile
;
// ================== CURRENT file header info
typedef
struct
{
uint32_t
version
;
// Current file system version (relating to code)
uint32_t
len
;
// Encode content length (including checksum)
}
SFSHeader
;
// ================== TSDB File System Meta
typedef
struct
{
uint32_t
version
;
// Commit version from 0 to increase
int64_t
totalPoints
;
// total points
int64_t
totalStorage
;
// Uncompressed total storage
}
STsdbFSMeta
;
// ==================
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
sf
;
// sma data file array v2f1900.index_name_1
}
SFSStatus
;
/**
* @brief Directory structure of .tsma data files.
*
* /vnode2/tsdb $ tree tsma/
* tsma/
* ├── v2f100.index_name_1
* ├── v2f101.index_name_1
* ├── v2f102.index_name_1
* ├── v2f1900.index_name_3
* ├── v2f1901.index_name_3
* ├── v2f1902.index_name_3
* ├── v2f200.index_name_2
* ├── v2f201.index_name_2
* └── v2f202.index_name_2
*
* 0 directories, 9 files
*/
typedef
struct
{
TdThreadRwlock
lock
;
SFSStatus
*
cstatus
;
// current status
SHashObj
*
metaCache
;
// meta cache
SHashObj
*
metaCacheComp
;
// meta cache for compact
bool
intxn
;
SFSStatus
*
nstatus
;
// new status
}
STsdbFS
;
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
#define FS_IN_TXN(pfs) (pfs)->intxn
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
typedef
struct
{
int
direction
;
uint64_t
version
;
// current FS version
STsdbFS
*
pfs
;
int
index
;
// used to position next fset when version the same
int
fid
;
// used to seek when version is changed
SDFileSet
*
pSet
;
}
SFSIter
;
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS
*
tsdbNewFS
(
const
STsdbCfg
*
pCfg
);
void
*
tsdbFreeFS
(
STsdbFS
*
pfs
);
int
tsdbOpenFS
(
STsdb
*
pRepo
);
void
tsdbCloseFS
(
STsdb
*
pRepo
);
void
tsdbStartFSTxn
(
STsdb
*
pRepo
,
int64_t
pointsAdd
,
int64_t
storageAdd
);
int
tsdbEndFSTxn
(
STsdb
*
pRepo
);
int
tsdbEndFSTxnWithError
(
STsdbFS
*
pfs
);
void
tsdbUpdateFSTxnMeta
(
STsdbFS
*
pfs
,
STsdbFSMeta
*
pMeta
);
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int
tsdbUpdateDFileSet
(
STsdbFS
*
pfs
,
const
SDFileSet
*
pSet
);
void
tsdbFSIterInit
(
SFSIter
*
pIter
,
STsdbFS
*
pfs
,
int
direction
);
void
tsdbFSIterSeek
(
SFSIter
*
pIter
,
int
fid
);
SDFileSet
*
tsdbFSIterNext
(
SFSIter
*
pIter
);
int
tsdbLoadMetaCache
(
STsdb
*
pRepo
,
bool
recoverMeta
);
static
FORCE_INLINE
int
tsdbRLockFS
(
STsdbFS
*
pFs
)
{
int
code
=
taosThreadRwlockRdlock
(
&
(
pFs
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int
tsdbWLockFS
(
STsdbFS
*
pFs
)
{
int
code
=
taosThreadRwlockWrlock
(
&
(
pFs
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int
tsdbUnLockFS
(
STsdbFS
*
pFs
)
{
int
code
=
taosThreadRwlockUnlock
(
&
(
pFs
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
#ifdef __cplusplus
}
#endif
#endif
/* _TD_TSDB_FS_H_ */
source/dnode/vnode/src/inc/tsdbLog.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_LOG_H_
#define _TD_TSDB_LOG_H_
#include "tlog.h"
#ifdef __cplusplus
extern
"C"
{
#endif
extern
int32_t
tsdbDebugFlag
;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif
/* _TD_TSDB_LOG_H_ */
\ No newline at end of file
source/dnode/vnode/src/inc/tsdbMemTable.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEM_TABLE_H_
#define _TD_TSDB_MEM_TABLE_H_
#include "tsdb.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
typedef
struct
STbData
{
tb_uid_t
uid
;
TSKEY
keyMin
;
TSKEY
keyMax
;
int64_t
nrows
;
SSkipList
*
pData
;
}
STbData
;
typedef
struct
STsdbMemTable
{
T_REF_DECLARE
()
SRWLatch
latch
;
TSKEY
keyMin
;
TSKEY
keyMax
;
uint64_t
nRow
;
SMemAllocator
*
pMA
;
// Container
SSkipList
*
pSlIdx
;
// SSkiplist<STbData>
SHashObj
*
pHashIdx
;
}
STsdbMemTable
;
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
);
void
tsdbFreeMemTable
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
);
int
tsdbMemTableInsert
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
static
FORCE_INLINE
STSRow
*
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
STSRow
*
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
TD_ROW_KEY
(
row
);
}
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_MEM_TABLE_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/tsdbMemory.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMORY_H_
#define _TD_TSDB_MEMORY_H_
#ifdef __cplusplus
extern
"C"
{
#endif
static
void
*
taosTMalloc
(
size_t
size
);
static
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
);
static
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
);
static
void
*
taosTZfree
(
void
*
ptr
);
static
size_t
taosTSizeof
(
void
*
ptr
);
static
void
taosTMemset
(
void
*
ptr
,
int
c
);
static
FORCE_INLINE
void
*
taosTMalloc
(
size_t
size
)
{
if
(
size
<=
0
)
return
NULL
;
void
*
ret
=
taosMemoryMalloc
(
size
+
sizeof
(
size_t
));
if
(
ret
==
NULL
)
return
NULL
;
*
(
size_t
*
)
ret
=
size
;
return
(
void
*
)((
char
*
)
ret
+
sizeof
(
size_t
));
}
static
FORCE_INLINE
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
)
{
size_t
tsize
=
nmemb
*
size
;
void
*
ret
=
taosTMalloc
(
tsize
);
if
(
ret
==
NULL
)
return
NULL
;
taosTMemset
(
ret
,
0
);
return
ret
;
}
static
FORCE_INLINE
size_t
taosTSizeof
(
void
*
ptr
)
{
return
(
ptr
)
?
(
*
(
size_t
*
)((
char
*
)
ptr
-
sizeof
(
size_t
)))
:
0
;
}
static
FORCE_INLINE
void
taosTMemset
(
void
*
ptr
,
int
c
)
{
memset
(
ptr
,
c
,
taosTSizeof
(
ptr
));
}
static
FORCE_INLINE
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
)
{
if
(
ptr
==
NULL
)
return
taosTMalloc
(
size
);
if
(
size
<=
taosTSizeof
(
ptr
))
return
ptr
;
void
*
tptr
=
(
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
));
size_t
tsize
=
size
+
sizeof
(
size_t
);
void
*
tptr1
=
taosMemoryRealloc
(
tptr
,
tsize
);
if
(
tptr1
==
NULL
)
return
NULL
;
tptr
=
tptr1
;
*
(
size_t
*
)
tptr
=
size
;
return
(
void
*
)((
char
*
)
tptr
+
sizeof
(
size_t
));
}
static
FORCE_INLINE
void
*
taosTZfree
(
void
*
ptr
)
{
if
(
ptr
)
{
taosMemoryFree
((
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
)));
}
return
NULL
;
}
#ifdef __cplusplus
}
#endif
#endif
/* _TD_TSDB_MEMORY_H_ */
\ No newline at end of file
source/dnode/vnode/src/inc/tsdbOptions.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_OPTIONS_H_
#define _TD_TSDB_OPTIONS_H_
#ifdef __cplusplus
extern
"C"
{
#endif
extern
const
STsdbCfg
defautlTsdbOptions
;
int
tsdbValidateOptions
(
const
STsdbCfg
*
);
void
tsdbOptionsCopy
(
STsdbCfg
*
pDest
,
const
STsdbCfg
*
pSrc
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_OPTIONS_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/tsdbReadImpl.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#include "os.h"
#include "tcommon.h"
#include "tfs.h"
#include "tsdb.h"
#include "tsdbFile.h"
#include "tsdbMemory.h"
#include "tskiplist.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SReadH
SReadH
;
typedef
struct
{
uint32_t
len
;
uint32_t
offset
;
uint32_t
hasLast
:
2
;
uint32_t
numOfBlocks
:
30
;
uint64_t
uid
;
TSKEY
maxKey
;
}
SBlockIdx
;
#ifdef TD_REFACTOR_3
typedef
struct
{
int64_t
last
:
1
;
int64_t
offset
:
63
;
int32_t
algorithm
:
8
;
int32_t
numOfRows
:
24
;
int32_t
len
;
int32_t
keyLen
;
// key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t
numOfSubBlocks
;
int16_t
numOfCols
;
// not including timestamp column
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SBlock
;
#else
typedef
enum
{
TSDB_SBLK_VER_0
=
0
,
TSDB_SBLK_VER_MAX
,
}
ESBlockVer
;
#define SBlockVerLatest TSDB_SBLK_VER_0
typedef
struct
{
uint8_t
last
:
1
;
uint8_t
blkVer
:
7
;
uint8_t
numOfSubBlocks
;
col_id_t
numOfCols
;
// not including timestamp column
uint32_t
len
;
// data block length
uint32_t
keyLen
:
20
;
// key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t
algorithm
:
4
;
uint32_t
reserve
:
8
;
col_id_t
numOfBSma
;
uint16_t
numOfRows
;
int64_t
offset
;
uint64_t
aggrStat
:
1
;
uint64_t
aggrOffset
:
63
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SBlockV0
;
#define SBlock SBlockV0 // latest SBlock definition
#endif
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
tid
;
uint64_t
uid
;
SBlock
blocks
[];
}
SBlockInfo
;
#ifdef TD_REFACTOR_3
typedef
struct
{
int16_t
colId
;
uint16_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint16_t
reserve
:
15
;
int32_t
len
;
uint32_t
type
:
8
;
uint32_t
offset
:
24
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
uint8_t
offsetH
;
char
padding
[
1
];
}
SBlockCol
;
#else
typedef
struct
{
int16_t
colId
;
uint16_t
type
:
6
;
uint16_t
blen
:
10
;
// bitmap length(TODO: full UT for the bitmap compress of various data input)
uint32_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint32_t
len
:
31
;
// data length + bitmap length
uint32_t
offset
;
}
SBlockColV0
;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
typedef
struct
{
int16_t
colId
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
}
SAggrBlkColV0
;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
static
FORCE_INLINE
void
tsdbSetBlockColOffset
(
SBlockCol
*
pBlockCol
,
uint32_t
offset
)
{
#ifdef TD_REFACTOR_3
pBlockCol
->
offset
=
offset
&
((((
uint32_t
)
1
)
<<
24
)
-
1
);
pBlockCol
->
offsetH
=
(
uint8_t
)(
offset
>>
24
);
#else
pBlockCol
->
offset
=
offset
;
#endif
}
static
FORCE_INLINE
uint32_t
tsdbGetBlockColOffset
(
SBlockCol
*
pBlockCol
)
{
#ifdef TD_REFACTOR_3
uint32_t
offset1
=
pBlockCol
->
offset
;
uint32_t
offset2
=
pBlockCol
->
offsetH
;
return
(
offset1
|
(
offset2
<<
24
));
#else
return
pBlockCol
->
offset
;
#endif
}
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
numOfCols
;
// For recovery usage
uint64_t
uid
;
// For recovery usage
SBlockCol
cols
[];
}
SBlockData
;
typedef
void
SAggrBlkData
;
// SBlockCol cols[];
struct
SReadH
{
STsdb
*
pRepo
;
SDFileSet
rSet
;
// FSET to read
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
// table to read
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
int
cidx
;
SBlockInfo
*
pBlkInfo
;
SBlockData
*
pBlkData
;
// Block info
SAggrBlkData
*
pAggrBlkData
;
// Aggregate Block info
SDataCols
*
pDCols
[
2
];
void
*
pBuf
;
// buffer
void
*
pCBuf
;
// compression buffer
void
*
pExBuf
;
// extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockStatisSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
default:
return
TSDB_BLOCK_STATIS_SIZE
(
nCols
,
0
);
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
static
FORCE_INLINE
size_t
tsdbBlockAggrSize
(
int
nCols
,
uint32_t
blkVer
)
{
switch
(
blkVer
)
{
case
TSDB_SBLK_VER_0
:
default:
return
TSDB_BLOCK_AGGR_SIZE
(
nCols
,
0
);
}
}
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdb
*
pRepo
);
void
tsdbDestroyReadH
(
SReadH
*
pReadh
);
int
tsdbSetAndOpenReadFSet
(
SReadH
*
pReadh
,
SDFileSet
*
pSet
);
void
tsdbCloseAndUnsetFSet
(
SReadH
*
pReadh
);
int
tsdbLoadBlockIdx
(
SReadH
*
pReadh
);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
);
static
FORCE_INLINE
int
tsdbMakeRoom
(
void
**
ppBuf
,
size_t
size
)
{
void
*
pBuf
=
*
ppBuf
;
size_t
tsize
=
taosTSizeof
(
pBuf
);
if
(
tsize
<
size
)
{
if
(
tsize
==
0
)
tsize
=
1024
;
while
(
tsize
<
size
)
{
tsize
*=
2
;
}
*
ppBuf
=
taosTRealloc
(
pBuf
,
tsize
);
if
(
*
ppBuf
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
}
return
0
;
}
#ifdef __cplusplus
}
#endif
#endif
/*_TD_TSDB_READ_IMPL_H_*/
source/dnode/vnode/src/inc/tsdbSma.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
#define TSDB_SMA_TEST // remove after test finished
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaEnv
SSmaEnv
;
typedef
struct
SSmaEnvs
SSmaEnvs
;
struct
SSmaEnv
{
TdThreadRwlock
lock
;
SDiskID
did
;
TDBEnv
dbEnv
;
// TODO: If it's better to put it in smaIndex level?
char
*
path
;
// relative path
SSmaStat
*
pStat
;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
struct
SSmaEnvs
{
int16_t
nTSma
;
int16_t
nRSma
;
SSmaEnv
*
pTSmaEnv
;
SSmaEnv
*
pRSmaEnv
;
};
void
tsdbDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tsdbFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
#if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
int64_t
groupId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
len
+=
taosEncodeFixedI64
(
pData
,
groupId
);
return
len
;
}
static
FORCE_INLINE
int32_t
tsdbRLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockRdlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int32_t
tsdbWLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockWrlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
static
FORCE_INLINE
int32_t
tsdbUnLockSma
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockUnlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
#endif
/* _TD_TSDB_SMA_H_ */
\ No newline at end of file
source/dnode/vnode/src/inc/vn
d
.h
→
source/dnode/vnode/src/inc/vn
odeInt
.h
浏览文件 @
4a03b48f
...
...
@@ -16,17 +16,23 @@
#ifndef _TD_VNODE_DEF_H_
#define _TD_VNODE_DEF_H_
#include "
tmalloca
tor.h"
// #include "sync
.h"
#include "
execu
tor.h"
#include "tchecksum
.h"
#include "tcoding.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tfs.h"
#include "tglobal.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tmacro.h"
#include "tmallocator.h"
#include "tskiplist.h"
#include "ttime.h"
#include "ttimer.h"
#include "vnode.h"
#include "vnodeQuery.h"
#include "wal.h"
#include "qworker.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -34,8 +40,9 @@ extern "C" {
typedef
struct
STQ
STQ
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorkerMgmt
SQHandle
;
typedef
struct
SVnodeTask
{
TD_DLIST_NODE
(
SVnodeTask
);
...
...
@@ -95,7 +102,9 @@ struct SVnode {
STfs
*
pTfs
;
};
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#define vFatal(...) \
do { \
...
...
@@ -205,6 +214,12 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#include "meta.h"
#include "tsdb.h"
#include "tq.h"
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/vnodeQuery.h
已删除
100644 → 0
浏览文件 @
fdbab777
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "qworker.h"
#include "vnode.h"
typedef
struct
SQWorkerMgmt
SQHandle
;
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_READ_H_*/
source/dnode/vnode/src/meta/metaCache.c
浏览文件 @
4a03b48f
...
...
@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "metaDef.h"
#include "vnodeInt.h"
struct
SMetaCache
{
// TODO
...
...
source/dnode/vnode/src/meta/metaCfg.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
metaDef
.h"
#include "
vnodeInt
.h"
const
SMetaCfg
defaultMetaOptions
=
{.
lruSize
=
0
};
...
...
source/dnode/vnode/src/meta/metaIdx.c
浏览文件 @
4a03b48f
...
...
@@ -16,7 +16,7 @@
#ifdef USE_INVERTED_INDEX
#include "index.h"
#endif
#include "
metaDef
.h"
#include "
vnodeInt
.h"
struct
SMetaIdx
{
#ifdef USE_INVERTED_INDEX
...
...
source/dnode/vnode/src/meta/metaMain.c
浏览文件 @
4a03b48f
...
...
@@ -15,7 +15,7 @@
#include "tcoding.h"
#include "
metaDef
.h"
#include "
vnodeInt
.h"
static
SMeta
*
metaNew
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
static
void
metaFree
(
SMeta
*
pMeta
);
...
...
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
metaDef
.h"
#include "
vnodeInt
.h"
#include "tdbInt.h"
typedef
struct
SPoolMem
{
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
metaDef
.h"
#include "
vnodeInt
.h"
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
// Validate the tbOptions
...
...
source/dnode/vnode/src/meta/metaTbCfg.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
metaDef
.h"
#include "
vnodeInt
.h"
#include "tcoding.h"
int
metaValidateTbCfg
(
SMeta
*
pMeta
,
const
STbCfg
*
pTbOptions
)
{
...
...
source/dnode/vnode/src/meta/metaTbUid.c
浏览文件 @
4a03b48f
...
...
@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "metaDef.h"
#include "vnodeInt.h"
int
metaOpenUidGnrt
(
SMeta
*
pMeta
)
{
// Init a generator
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
4a03b48f
...
...
@@ -15,9 +15,8 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "tqInt.h"
#include "tqMetaStore.h"
#include "tstream.h"
#include "vnodeInt.h"
int32_t
tqInit
()
{
return
tqPushMgrInit
();
}
...
...
@@ -83,9 +82,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy
(
data
,
msg
,
msgLen
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
)
!=
0
)
{
return
-
1
;
}
//
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
//
return -1;
//
}
}
SRpcMsg
req
=
{
...
...
@@ -272,7 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset
=
pReq
->
currentOffset
+
1
;
}
vDebug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
pReq
->
currentOffset
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
pReq
->
currentOffset
,
fetchOffset
);
SMqPollRsp
rsp
=
{
/*.consumerId = consumerId,*/
...
...
@@ -296,10 +296,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
STqTopic
*
pTopic
=
NULL
;
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
topic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
//TODO race condition
//
TODO race condition
ASSERT
(
pConsumer
->
consumerId
==
consumerId
);
if
(
strcmp
(
topic
->
topicName
,
pReq
->
topic
)
==
0
)
{
pTopic
=
topic
;
...
...
@@ -307,7 +307,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
if
(
pTopic
==
NULL
)
{
vWarn
(
"tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d"
,
consumerId
,
pReq
->
epoch
,
pReq
->
topic
,
pTq
->
pVnode
->
vgId
);
vWarn
(
"tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d"
,
consumerId
,
pReq
->
epoch
,
pReq
->
topic
,
pTq
->
pVnode
->
vgId
);
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
-
1
;
...
...
@@ -322,10 +323,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
while
(
1
)
{
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
//TODO
//
TODO
consumerEpoch
=
atomic_load_32
(
&
pConsumer
->
epoch
);
if
(
consumerEpoch
>
pReq
->
epoch
)
{
//TODO: return
//
TODO: return
break
;
}
SWalReadHead
*
pHead
;
...
...
@@ -333,10 +334,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
break
;
}
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
...
...
@@ -360,7 +363,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
fetchOffset
++
;
rsp
.
skipLogNum
++
;
taosArrayDestroy
(
pRes
);
...
...
@@ -389,7 +393,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
tmsgSendRsp
(
pMsg
);
taosMemoryFree
(
pHead
);
return
0
;
...
...
@@ -420,7 +425,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
/*}*/
return
0
;
...
...
@@ -431,7 +437,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
terrno
=
TSDB_CODE_SUCCESS
;
tDecodeSMqMVRebReq
(
msg
,
&
req
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
ASSERT
(
pConsumer
->
consumerId
==
req
.
oldConsumerId
);
...
...
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
4a03b48f
...
...
@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tqMetaStore
.h"
#include "
vnodeInt
.h"
// TODO:replace by an abstract file layer
#include <fcntl.h>
#include <string.h>
...
...
source/dnode/vnode/src/tq/tqOffset.c
浏览文件 @
4a03b48f
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "
tqOffse
t.h"
#include "
vnodeIn
t.h"
enum
ETqOffsetPersist
{
TQ_OFFSET_PERSIST__LAZY
=
1
,
...
...
@@ -39,4 +39,3 @@ STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
pStore
->
pHash
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_NO_LOCK
);
return
pStore
;
}
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tqPush
.h"
#include "
vnodeInt
.h"
int32_t
tqPushMgrInit
()
{
//
...
...
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
浏览文件 @
4a03b48f
...
...
@@ -19,8 +19,6 @@
#include "taoserror.h"
#include "tcoding.h"
#include "thash.h"
#include "tsdbDBDef.h"
#include "tsdbLog.h"
#define IMPL_WITH_LOCK 1
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
#define TSDB_MAX_SUBBLOCKS 8
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
4a03b48f
...
...
@@ -14,7 +14,7 @@
*/
#include <regex.h>
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
#include "os.h"
typedef
enum
{
TSDB_TXN_TEMP_FILE
=
0
,
TSDB_TXN_CURR_FILE
}
TSDB_TXN_FILE_T
;
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
static
const
char
*
TSDB_FNAME_SUFFIX
[]
=
{
"head"
,
// TSDB_FILE_HEAD
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
static
STsdb
*
tsdbNew
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
...
...
@@ -87,8 +87,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
static
void
tsdbFree
(
STsdb
*
pTsdb
)
{
if
(
pTsdb
)
{
tsdbFreeSmaEnv
(
REPO_TSMA_ENV
(
pTsdb
));
tsdbFreeSmaEnv
(
REPO_RSMA_ENV
(
pTsdb
));
//
tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
//
tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
tsdbFreeFS
(
pTsdb
->
fs
);
taosMemoryFreeClear
(
pTsdb
->
path
);
taosMemoryFree
(
pTsdb
);
...
...
@@ -98,7 +98,7 @@ static void tsdbFree(STsdb *pTsdb) {
static
int
tsdbOpenImpl
(
STsdb
*
pTsdb
)
{
tsdbOpenFS
(
pTsdb
);
tsdbInitSma
(
pTsdb
);
//
tsdbInitSma(pTsdb);
// TODO
return
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
static
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
static
int
tsdbMemTableInsertTbData
(
STsdb
*
pRepo
,
SSubmitBlk
*
pBlock
,
int32_t
*
pAffectedRows
);
...
...
source/dnode/vnode/src/tsdb/tsdbOptions.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
const
STsdbCfg
defautlTsdbOptions
=
{.
precision
=
0
,
.
lruCacheSize
=
0
,
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
4a03b48f
...
...
@@ -13,25 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
#include "tdatablock.h"
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "texception.h"
#include "tsdb.h"
#include "tsdbFS.h"
#include "tsdbLog.h"
#include "tsdbReadImpl.h"
#include "tskiplist.h"
#include "ttime.h"
#include "taosdef.h"
#include "tlosertree.h"
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
#include "tmsg.h"
#include "tsdbCommit.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
#define TSDB_KEY_COL_OFFSET 0
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
static
const
char
*
TSDB_SMA_DNAME
[]
=
{
""
,
// TSDB_SMA_TYPE_BLOCK
...
...
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tsdbDef
.h"
#include "
vnodeInt
.h"
/**
* @brief insert TS data
...
...
source/dnode/vnode/src/vnd/vnodeArenaMAImpl.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
static
SVArenaNode
*
vArenaNodeNew
(
uint64_t
capacity
);
static
void
vArenaNodeFree
(
SVArenaNode
*
pNode
);
...
...
source/dnode/vnode/src/vnd/vnodeBufferPool.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
/* ------------------------ STRUCTURES ------------------------ */
#define VNODE_BUF_POOL_SHARDS 3
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
const
SVnodeCfg
defaultVnodeOptions
=
{
.
wsize
=
96
*
1024
*
1024
,
.
ssize
=
1
*
1024
*
1024
,
.
lsize
=
1024
,
.
walCfg
=
{.
level
=
TAOS_WAL_WRITE
}};
/* TODO */
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
static
int
vnodeStartCommit
(
SVnode
*
pVnode
);
static
int
vnodeEndCommit
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeInt.c
浏览文件 @
4a03b48f
...
...
@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE
#include "sync.h"
#include "vn
d
.h"
#include "vn
odeInt
.h"
// #include "vnodeInt.h"
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
)
{
return
0
;
}
...
...
source/dnode/vnode/src/vnd/vnodeMain.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
static
SVnode
*
vnodeNew
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
);
static
void
vnodeFree
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeMgr.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
#include "tglobal.h"
SVnodeMgr
vnodeMgr
=
{.
vnodeInitFlag
=
TD_MOD_UNINITIALIZED
};
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
4a03b48f
...
...
@@ -13,9 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeQuery.h"
#include "executor.h"
#include "vn
d
.h"
#include "vn
odeInt
.h"
static
int32_t
vnodeGetTableList
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
static
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
4a03b48f
...
...
@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
d
.h"
#include "vn
odeInt
.h"
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
blockDebugShowData
(
data
);
tsdbInsertTSmaData
(((
SVnode
*
)
pVnode
)
->
pTsdb
,
smaId
,
(
const
char
*
)
data
);
//
blockDebugShowData(data);
//
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
}
void
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
...
...
@@ -232,16 +232,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
#endif
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
}
break
;
case
TDMT_VND_CANCEL_SMA
:
{
// timeRangeSMA
}
break
;
case
TDMT_VND_DROP_SMA
:
{
// timeRangeSMA
if
(
tsdbDropTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
//
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
//
// TODO
//
}
//
} break;
//
case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
//
} break;
//
case TDMT_VND_DROP_SMA: { // timeRangeSMA
//
if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
//
// TODO
//
}
#if 0
tsdbTSmaSub(pVnode->pTsdb, 1);
SVDropTSmaReq vDropSmaReq = {0};
...
...
source/dnode/vnode/test/CMakeLists.txt
浏览文件 @
4a03b48f
...
...
@@ -25,15 +25,15 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
# COMMAND tqTest
# )
ADD_EXECUTABLE
(
tsdbSmaTest tsdbSmaTest.cpp
)
TARGET_LINK_LIBRARIES
(
tsdbSmaTest
PUBLIC os util common vnode gtest_main
)
#
ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
#
TARGET_LINK_LIBRARIES(
#
tsdbSmaTest
#
PUBLIC os util common vnode gtest_main
#
)
TARGET_INCLUDE_DIRECTORIES
(
tsdbSmaTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/common"
PUBLIC
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../src/inc"
PUBLIC
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
\ No newline at end of file
# TARGET_INCLUDE_DIRECTORIES(
# tsdbSmaTest
# PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
# PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
# PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
# )
\ No newline at end of file
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
4a03b48f
...
...
@@ -14,13 +14,13 @@
*/
#include <gtest/gtest.h>
#include <
tsdbDef
.h>
#include <
vnodeInt
.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#include <
metaDef
.h>
#include <
vnodeInt
.h>
#include <tmsg.h>
#pragma GCC diagnostic push
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
4a03b48f
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <
tsdb
.h>
#include <
vnode
.h>
#include "dataSinkMgt.h"
#include "texception.h"
#include "os.h"
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4a03b48f
...
...
@@ -34,7 +34,7 @@
#include "tcompare.h"
#include "tcompression.h"
#include "thash.h"
#include "
tsdb
.h"
#include "
vnode
.h"
#include "ttypes.h"
#include "vnode.h"
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4a03b48f
...
...
@@ -28,7 +28,7 @@
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "
tsdb
.h"
#include "
vnode
.h"
#include "ttypes.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录