Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
88d755be
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
88d755be
编写于
8月 04, 2023
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(tsdb/cos): s3 migration
上级
6e80f55e
变更
12
展开全部
隐藏空白更改
内联
并排
Showing
12 changed file
with
3530 addition
and
26 deletion
+3530
-26
cmake/cmake.options
cmake/cmake.options
+6
-0
cmake/cos_CMakeLists.txt.in
cmake/cos_CMakeLists.txt.in
+12
-0
contrib/CMakeLists.txt
contrib/CMakeLists.txt
+23
-0
contrib/test/CMakeLists.txt
contrib/test/CMakeLists.txt
+5
-0
contrib/test/cos/CMakeLists.txt
contrib/test/cos/CMakeLists.txt
+49
-0
contrib/test/cos/main.c
contrib/test/cos/main.c
+3090
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+48
-20
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+27
-1
source/dnode/vnode/src/inc/vndCos.h
source/dnode/vnode/src/inc/vndCos.h
+36
-0
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+115
-5
source/dnode/vnode/src/vnd/vnodeCos.c
source/dnode/vnode/src/vnd/vnodeCos.c
+114
-0
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+5
-0
未找到文件。
cmake/cmake.options
浏览文件 @
88d755be
...
...
@@ -125,6 +125,12 @@ option(
ON
)
option(
BUILD_WITH_COS
"If build with cos"
ON
)
option(
BUILD_WITH_SQLITE
"If build with sqlite"
...
...
cmake/cos_CMakeLists.txt.in
0 → 100644
浏览文件 @
88d755be
# cos
ExternalProject_Add(cos
GIT_REPOSITORY https://github.com/tencentyun/cos-c-sdk-v5.git
GIT_TAG v5.0.16
SOURCE_DIR "${TD_CONTRIB_DIR}/cos-c-sdk-v5"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
contrib/CMakeLists.txt
浏览文件 @
88d755be
...
...
@@ -122,6 +122,12 @@ if(${BUILD_WITH_SQLITE})
cat
(
"
${
TD_SUPPORT_DIR
}
/sqlite_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
endif
(
${
BUILD_WITH_SQLITE
}
)
# cos
if
(
${
BUILD_WITH_COS
}
)
cat
(
"
${
TD_SUPPORT_DIR
}
/cos_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
add_definitions
(
-DUSE_COS
)
endif
(
${
BUILD_WITH_COS
}
)
# lucene
if
(
${
BUILD_WITH_LUCENE
}
)
cat
(
"
${
TD_SUPPORT_DIR
}
/lucene_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
...
...
@@ -347,6 +353,23 @@ if (${BUILD_WITH_ROCKSDB})
endif
()
endif
()
# cos
if
(
${
BUILD_WITH_COS
}
)
option
(
ENABLE_TEST
"Enable the tests"
OFF
)
set
(
CMAKE_BUILD_TYPE debug
)
set
(
ORIG_CMAKE_PROJECT_NAME
${
CMAKE_PROJECT_NAME
}
)
set
(
CMAKE_PROJECT_NAME cos_c_sdk
)
add_subdirectory
(
cos-c-sdk-v5 EXCLUDE_FROM_ALL
)
target_include_directories
(
cos_c_sdk
PUBLIC $<BUILD_INTERFACE:
${
CMAKE_CURRENT_SOURCE_DIR
}
/cos-c-sdk-v5/cos_c_sdk>
)
set
(
CMAKE_PROJECT_NAME
${
ORIG_CMAKE_PROJECT_NAME
}
)
endif
(
${
BUILD_WITH_COS
}
)
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if
(
${
BUILD_WITH_LUCENE
}
)
...
...
contrib/test/CMakeLists.txt
浏览文件 @
88d755be
...
...
@@ -3,6 +3,11 @@ if(${BUILD_WITH_ROCKSDB})
add_subdirectory
(
rocksdb
)
endif
(
${
BUILD_WITH_ROCKSDB
}
)
# cos
if
(
${
BUILD_WITH_COS
}
)
add_subdirectory
(
cos
)
endif
(
${
BUILD_WITH_COS
}
)
if
(
${
BUILD_WITH_LUCENE
}
)
add_subdirectory
(
lucene
)
endif
(
${
BUILD_WITH_LUCENE
}
)
...
...
contrib/test/cos/CMakeLists.txt
0 → 100644
浏览文件 @
88d755be
add_executable
(
cosTest
""
)
target_sources
(
cosTest
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/main.c"
)
#find_path(APR_INCLUDE_DIR apr-1/apr_time.h)
#find_path(APR_UTIL_INCLUDE_DIR apr/include/apr-1/apr_md5.h)
#find_path(MINIXML_INCLUDE_DIR mxml.h)
#find_path(CURL_INCLUDE_DIR curl/curl.h)
#include_directories (${MINIXML_INCLUDE_DIR})
#include_directories (${CURL_INCLUDE_DIR})
FIND_PROGRAM
(
APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/
)
#FIND_PROGRAM(APU_CONFIG_BIN NAMES apu-config apu-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF
(
APR_CONFIG_BIN
)
EXECUTE_PROCESS
(
COMMAND
${
APR_CONFIG_BIN
}
--includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF
()
#IF (APU_CONFIG_BIN)
# EXECUTE_PROCESS(
# COMMAND ${APU_CONFIG_BIN} --includedir
# OUTPUT_VARIABLE APR_UTIL_INCLUDE_DIR
# OUTPUT_STRIP_TRAILING_WHITESPACE
# )
#ENDIF()
include_directories
(
${
APR_INCLUDE_DIR
}
)
#include_directories (${APR_UTIL_INCLUDE_DIR})
target_include_directories
(
cosTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/cos-c-sdk-v5/cos_c_sdk"
)
find_library
(
APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/
)
find_library
(
APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/
)
find_library
(
MINIXML_LIBRARY mxml
)
find_library
(
CURL_LIBRARY curl
)
target_link_libraries
(
cosTest cos_c_sdk
)
target_link_libraries
(
cosTest
${
APR_UTIL_LIBRARY
}
)
target_link_libraries
(
cosTest
${
APR_LIBRARY
}
)
target_link_libraries
(
cosTest
${
MINIXML_LIBRARY
}
)
target_link_libraries
(
cosTest
${
CURL_LIBRARY
}
)
contrib/test/cos/main.c
0 → 100644
浏览文件 @
88d755be
此差异已折叠。
点击以展开。
source/common/src/tglobal.c
浏览文件 @
88d755be
...
...
@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "os.h"
#include "tconfig.h"
#include "tgrant.h"
#include "tlog.h"
...
...
@@ -63,7 +63,7 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t
tsNumOfSnodeStreamThreads
=
4
;
int32_t
tsNumOfSnodeWriteThreads
=
1
;
int32_t
tsMaxStreamBackendCache
=
128
;
// M
int32_t
tsPQSortMemThreshold
=
16
;
// M
int32_t
tsPQSortMemThreshold
=
16
;
// M
// sync raft
int32_t
tsElectInterval
=
25
*
1000
;
...
...
@@ -121,8 +121,8 @@ int32_t tsQueryPolicy = 1;
int32_t
tsQueryRspPolicy
=
0
;
int64_t
tsQueryMaxConcurrentTables
=
200
;
// unit is TSDB_TABLE_NUM_UNIT
bool
tsEnableQueryHb
=
true
;
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
bool
tsTtlChangeOnWrite
=
false
;
// ttl delete time changes on last write if true
bool
tsEnableScience
=
false
;
// on taos-cli show float and doulbe with scientific notation if true
bool
tsTtlChangeOnWrite
=
false
;
// ttl delete time changes on last write if true
int32_t
tsQuerySmaOptimize
=
0
;
int32_t
tsQueryRsmaTolerance
=
1000
;
// the tolerance time (ms) to judge from which level to query rsma data.
bool
tsQueryPlannerTrace
=
false
;
...
...
@@ -235,6 +235,13 @@ int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool
tsFilterScalarMode
=
false
;
int32_t
tsKeepTimeOffset
=
0
;
// latency of data migration
char
tsS3Endpoint
[
TSDB_FQDN_LEN
]
=
"<endpoint>"
;
char
tsS3AcessKeyId
[
TSDB_FQDN_LEN
]
=
"<accesskeyid>"
;
char
tsS3AcessKeySecret
[
TSDB_FQDN_LEN
]
=
"<accesskeysecrect>"
;
char
tsS3BucketName
[
TSDB_FQDN_LEN
]
=
"<bucketname>"
;
char
tsS3AppId
[
TSDB_FQDN_LEN
]
=
"<appid>"
;
int8_t
tsS3Enabled
=
false
;
#ifndef _STORAGE
int32_t
taosSetTfsCfg
(
SConfig
*
pCfg
)
{
SConfigItem
*
pItem
=
cfgGetItem
(
pCfg
,
"dataDir"
);
...
...
@@ -256,7 +263,9 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
int32_t
taosSetTfsCfg
(
SConfig
*
pCfg
);
#endif
struct
SConfig
*
taosGetCfg
()
{
return
tsCfg
;
}
struct
SConfig
*
taosGetCfg
()
{
return
tsCfg
;
}
static
int32_t
taosLoadCfg
(
SConfig
*
pCfg
,
const
char
**
envCmd
,
const
char
*
inputCfgDir
,
const
char
*
envFile
,
char
*
apolloUrl
)
{
...
...
@@ -376,7 +385,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"maxRetryWaitTime"
,
tsMaxRetryWaitTime
,
0
,
86400000
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"useAdapter"
,
tsUseAdapter
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"crashReporting"
,
tsEnableCrashReport
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"queryMaxConcurrentTables"
,
tsQueryMaxConcurrentTables
,
INT64_MIN
,
INT64_MAX
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"metaCacheMaxSize"
,
tsMetaCacheMaxSize
,
-
1
,
INT32_MAX
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"slowLogThreshold"
,
tsSlowLogThreshold
,
0
,
INT32_MAX
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"slowLogScope"
,
""
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
...
...
@@ -389,7 +400,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"numOfRpcSessions"
,
tsNumOfRpcSessions
,
1
,
100000
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
tsTimeToGetAvailableConn
=
TRANGE
(
tsTimeToGetAvailableConn
,
20
,
10000000
);
if
(
cfgAddInt32
(
pCfg
,
"timeToGetAvailableConn"
,
tsTimeToGetAvailableConn
,
20
,
1000000
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"timeToGetAvailableConn"
,
tsTimeToGetAvailableConn
,
20
,
1000000
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
tsNumOfTaskQueueThreads
=
tsNumOfCores
/
2
;
tsNumOfTaskQueueThreads
=
TMAX
(
tsNumOfTaskQueueThreads
,
4
);
...
...
@@ -449,7 +461,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"statusInterval"
,
tsStatusInterval
,
1
,
30
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"minSlidingTime"
,
tsMinSlidingTime
,
1
,
1000000
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"minIntervalTime"
,
tsMinIntervalTime
,
1
,
1000000
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxNumOfDistinctRes"
,
tsMaxNumOfDistinctResults
,
10
*
10000
,
10000
*
10000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxNumOfDistinctRes"
,
tsMaxNumOfDistinctResults
,
10
*
10000
,
10000
*
10000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"countAlwaysReturnValue"
,
tsCountAlwaysReturnValue
,
0
,
1
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"queryBufferSize"
,
tsQueryBufferSize
,
-
1
,
500000000000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"printAuth"
,
tsPrintAuth
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
...
...
@@ -477,7 +491,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads
=
TMAX
(
tsNumOfVnodeQueryThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeQueryThreads"
,
tsNumOfVnodeQueryThreads
,
4
,
1024
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddFloat
(
pCfg
,
"ratioOfVnodeStreamThreads"
,
tsRatioOfVnodeStreamThreads
,
0
.
01
,
100
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddFloat
(
pCfg
,
"ratioOfVnodeStreamThreads"
,
tsRatioOfVnodeStreamThreads
,
0
.
01
,
100
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
...
...
@@ -497,7 +512,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeStreamThreads
=
tsNumOfCores
/
4
;
tsNumOfSnodeStreamThreads
=
TRANGE
(
tsNumOfSnodeStreamThreads
,
2
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfSnodeSharedThreads"
,
tsNumOfSnodeStreamThreads
,
2
,
1024
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfSnodeSharedThreads"
,
tsNumOfSnodeStreamThreads
,
2
,
1024
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
tsNumOfSnodeWriteThreads
=
tsNumOfCores
/
4
;
tsNumOfSnodeWriteThreads
=
TRANGE
(
tsNumOfSnodeWriteThreads
,
2
,
4
);
...
...
@@ -505,14 +521,18 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsRpcQueueMemoryAllowed
=
tsTotalMemoryKB
*
1024
*
0
.
1
;
tsRpcQueueMemoryAllowed
=
TRANGE
(
tsRpcQueueMemoryAllowed
,
TSDB_MAX_MSG_SIZE
*
10LL
,
TSDB_MAX_MSG_SIZE
*
10000LL
);
if
(
cfgAddInt64
(
pCfg
,
"rpcQueueMemoryAllowed"
,
tsRpcQueueMemoryAllowed
,
TSDB_MAX_MSG_SIZE
*
10L
,
INT64_MAX
,
CFG_SCOPE_BOTH
)
!=
0
)
if
(
cfgAddInt64
(
pCfg
,
"rpcQueueMemoryAllowed"
,
tsRpcQueueMemoryAllowed
,
TSDB_MAX_MSG_SIZE
*
10L
,
INT64_MAX
,
CFG_SCOPE_BOTH
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"syncElectInterval"
,
tsElectInterval
,
10
,
1000
*
60
*
24
*
2
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"syncHeartbeatInterval"
,
tsHeartbeatInterval
,
10
,
1000
*
60
*
24
*
2
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"syncHeartbeatTimeout"
,
tsHeartbeatTimeout
,
10
,
1000
*
60
*
24
*
2
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"syncHeartbeatInterval"
,
tsHeartbeatInterval
,
10
,
1000
*
60
*
24
*
2
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"syncHeartbeatTimeout"
,
tsHeartbeatTimeout
,
10
,
1000
*
60
*
24
*
2
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"vndCommitMaxInterval"
,
tsVndCommitMaxIntervalMs
,
1000
,
1000
*
60
*
60
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"vndCommitMaxInterval"
,
tsVndCommitMaxIntervalMs
,
1000
,
1000
*
60
*
60
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"mndSdbWriteDelta"
,
tsMndSdbWriteDelta
,
20
,
10000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"mndLogRetention"
,
tsMndLogRetention
,
500
,
10000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
...
...
@@ -542,7 +562,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"uptimeInterval"
,
tsUptimeInterval
,
1
,
100000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"queryRsmaTolerance"
,
tsQueryRsmaTolerance
,
0
,
900000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"walFsyncDataSizeLimit"
,
tsWalFsyncDataSizeLimit
,
100
*
1024
*
1024
,
INT64_MAX
,
CFG_SCOPE_SERVER
)
!=
0
)
if
(
cfgAddInt64
(
pCfg
,
"walFsyncDataSizeLimit"
,
tsWalFsyncDataSizeLimit
,
100
*
1024
*
1024
,
INT64_MAX
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"udf"
,
tsStartUdfd
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
...
...
@@ -553,13 +574,16 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt64
(
pCfg
,
"streamBufferSize"
,
tsStreamBufferSize
,
0
,
INT64_MAX
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt64
(
pCfg
,
"checkpointInterval"
,
tsCheckpointInterval
,
0
,
INT64_MAX
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"cacheLazyLoadThreshold"
,
tsCacheLazyLoadThreshold
,
0
,
100000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"cacheLazyLoadThreshold"
,
tsCacheLazyLoadThreshold
,
0
,
100000
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"filterScalarMode"
,
tsFilterScalarMode
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"keepTimeOffset"
,
tsKeepTimeOffset
,
0
,
23
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxStreamBackendCache"
,
tsMaxStreamBackendCache
,
16
,
1024
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"pqSortMemThreshold"
,
tsPQSortMemThreshold
,
1
,
10240
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"s3BucketName"
,
tsS3BucketName
,
CFG_SCOPE_SERVER
)
!=
0
)
return
-
1
;
GRANT_CFG_ADD
;
return
0
;
}
...
...
@@ -908,7 +932,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy
(
tsTelemServer
,
cfgGetItem
(
pCfg
,
"telemetryServer"
)
->
str
,
TSDB_FQDN_LEN
);
tsTelemPort
=
(
uint16_t
)
cfgGetItem
(
pCfg
,
"telemetryPort"
)
->
i32
;
tmqMaxTopicNum
=
cfgGetItem
(
pCfg
,
"tmqMaxTopicNum"
)
->
i32
;
tmqMaxTopicNum
=
cfgGetItem
(
pCfg
,
"tmqMaxTopicNum"
)
->
i32
;
tsTransPullupInterval
=
cfgGetItem
(
pCfg
,
"transPullupInterval"
)
->
i32
;
tsMqRebalanceInterval
=
cfgGetItem
(
pCfg
,
"mqRebalanceInterval"
)
->
i32
;
...
...
@@ -948,6 +972,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMaxStreamBackendCache
=
cfgGetItem
(
pCfg
,
"maxStreamBackendCache"
)
->
i32
;
tsPQSortMemThreshold
=
cfgGetItem
(
pCfg
,
"pqSortMemThreshold"
)
->
i32
;
tstrncpy
(
tsS3BucketName
,
cfgGetItem
(
pCfg
,
"s3BucketName"
)
->
str
,
TSDB_FQDN_LEN
);
GRANT_CFG_GET
;
return
0
;
}
...
...
@@ -1020,7 +1046,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
taosSetCoreDump
(
enableCore
);
}
else
if
(
strcasecmp
(
"enableQueryHb"
,
name
)
==
0
)
{
tsEnableQueryHb
=
cfgGetItem
(
pCfg
,
"enableQueryHb"
)
->
bval
;
}
else
if
(
strcasecmp
(
"ttlChangeOnWrite"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"ttlChangeOnWrite"
,
name
)
==
0
)
{
tsTtlChangeOnWrite
=
cfgGetItem
(
pCfg
,
"ttlChangeOnWrite"
)
->
bval
;
}
break
;
...
...
@@ -1249,9 +1275,9 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// } else if (strcasecmp("smlBatchSize", name) == 0) {
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
}
else
if
(
strcasecmp
(
"smlTsDefaultName"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"smlTsDefaultName"
,
name
)
==
0
)
{
tstrncpy
(
tsSmlTsDefaultName
,
cfgGetItem
(
pCfg
,
"smlTsDefaultName"
)
->
str
,
TSDB_COL_NAME_LEN
);
}
else
if
(
strcasecmp
(
"smlDot2Underline"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"smlDot2Underline"
,
name
)
==
0
)
{
tsSmlDot2Underline
=
cfgGetItem
(
pCfg
,
"smlDot2Underline"
)
->
bval
;
}
else
if
(
strcasecmp
(
"shellActivityTimer"
,
name
)
==
0
)
{
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
...
...
@@ -1272,6 +1298,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
taosGetFqdnPortFromEp
(
strlen
(
pFirstEpItem
->
str
)
==
0
?
defaultFirstEp
:
pFirstEpItem
->
str
,
&
firstEp
);
snprintf
(
tsFirst
,
sizeof
(
tsFirst
),
"%s:%u"
,
firstEp
.
fqdn
,
firstEp
.
port
);
cfgSetItem
(
pCfg
,
"firstEp"
,
tsFirst
,
pFirstEpItem
->
stype
);
}
else
if
(
strcasecmp
(
"s3BucketName"
,
name
)
==
0
)
{
tstrncpy
(
tsS3BucketName
,
cfgGetItem
(
pCfg
,
"s3BucketName"
)
->
str
,
TSDB_FQDN_LEN
);
}
else
if
(
strcasecmp
(
"sDebugFlag"
,
name
)
==
0
)
{
sDebugFlag
=
cfgGetItem
(
pCfg
,
"sDebugFlag"
)
->
i32
;
}
else
if
(
strcasecmp
(
"smaDebugFlag"
,
name
)
==
0
)
{
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
88d755be
...
...
@@ -8,6 +8,7 @@ set(
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeCos.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
...
...
@@ -134,6 +135,11 @@ else()
endif
()
endif
()
find_library
(
APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/
)
find_library
(
APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/
)
find_library
(
MINIXML_LIBRARY mxml
)
find_library
(
CURL_LIBRARY curl
)
target_link_libraries
(
vnode
PUBLIC os
...
...
@@ -153,6 +159,13 @@ target_link_libraries(
PUBLIC transport
PUBLIC stream
PUBLIC index
# s3
cos_c_sdk
${
APR_UTIL_LIBRARY
}
${
APR_LIBRARY
}
${
MINIXML_LIBRARY
}
${
CURL_LIBRARY
}
)
IF
(
TD_GRANT
)
...
...
@@ -169,7 +182,20 @@ if(${BUILD_WITH_ROCKSDB})
add_definitions
(
-DUSE_ROCKSDB
)
endif
(
${
BUILD_WITH_ROCKSDB
}
)
# s3
FIND_PROGRAM
(
APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/
)
IF
(
APR_CONFIG_BIN
)
EXECUTE_PROCESS
(
COMMAND
${
APR_CONFIG_BIN
}
--includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF
()
include_directories
(
${
APR_INCLUDE_DIR
}
)
target_include_directories
(
vnode
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/cos-c-sdk-v5/cos_c_sdk"
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
...
...
source/dnode/vnode/src/inc/vndCos.h
0 → 100644
浏览文件 @
88d755be
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VND_COS_H_
#define _TD_VND_COS_H_
#include "vnd.h"
#ifdef __cplusplus
extern
"C"
{
#endif
extern
int8_t
tsS3Enabled
;
int32_t
s3Init
();
void
s3CleanUp
();
void
s3PutObjectFromFile
(
const
char
*
file
,
const
char
*
object
);
void
s3DeleteObjects
(
const
char
*
object_name
[],
int
nobject
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VND_COS_H_*/
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
88d755be
...
...
@@ -15,6 +15,7 @@
#include "tsdb.h"
#include "tsdbFS2.h"
#include "vndCos.h"
typedef
struct
{
STsdb
*
tsdb
;
...
...
@@ -41,6 +42,28 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
return
TARRAY2_APPEND
(
rtner
->
fopArr
,
op
);
}
static
int32_t
tsdbRemoveFileObjectS3
(
SRTNer
*
rtner
,
const
STFileObj
*
fobj
)
{
int32_t
code
=
0
,
lino
=
0
;
STFileOp
op
=
{
.
optype
=
TSDB_FOP_REMOVE
,
.
fid
=
fobj
->
f
->
fid
,
.
of
=
fobj
->
f
[
0
],
};
code
=
TARRAY2_APPEND
(
rtner
->
fopArr
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
const
char
*
object_name
=
taosDirEntryBaseName
((
char
*
)
fobj
->
fname
);
s3DeleteObjects
(
&
object_name
,
1
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
rtner
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbDoCopyFile
(
SRTNer
*
rtner
,
const
STFileObj
*
from
,
const
STFile
*
to
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
...
...
@@ -76,6 +99,33 @@ _exit:
return
code
;
}
static
int32_t
tsdbCopyFileS3
(
SRTNer
*
rtner
,
const
STFileObj
*
from
,
const
STFile
*
to
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
char
fname
[
TSDB_FILENAME_LEN
];
TdFilePtr
fdFrom
=
NULL
;
TdFilePtr
fdTo
=
NULL
;
tsdbTFileName
(
rtner
->
tsdb
,
to
,
fname
);
fdFrom
=
taosOpenFile
(
from
->
fname
,
TD_FILE_READ
);
if
(
fdFrom
==
NULL
)
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
char
*
object_name
=
taosDirEntryBaseName
(
fname
);
s3PutObjectFromFile
(
from
->
fname
,
object_name
);
taosCloseFile
(
&
fdFrom
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
rtner
->
tsdb
->
pVnode
),
lino
,
code
);
taosCloseFile
(
&
fdFrom
);
}
return
code
;
}
static
int32_t
tsdbDoMigrateFileObj
(
SRTNer
*
rtner
,
const
STFileObj
*
fobj
,
const
SDiskID
*
did
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
...
...
@@ -123,6 +173,53 @@ _exit:
return
code
;
}
static
int32_t
tsdbMigrateDataFileS3
(
SRTNer
*
rtner
,
const
STFileObj
*
fobj
,
const
SDiskID
*
did
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STFileOp
op
=
{
0
};
// remove old
op
=
(
STFileOp
){
.
optype
=
TSDB_FOP_REMOVE
,
.
fid
=
fobj
->
f
->
fid
,
.
of
=
fobj
->
f
[
0
],
};
code
=
TARRAY2_APPEND
(
rtner
->
fopArr
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// create new
op
=
(
STFileOp
){
.
optype
=
TSDB_FOP_CREATE
,
.
fid
=
fobj
->
f
->
fid
,
.
nf
=
{
.
type
=
fobj
->
f
->
type
,
.
did
=
did
[
0
],
.
fid
=
fobj
->
f
->
fid
,
.
cid
=
fobj
->
f
->
cid
,
.
size
=
fobj
->
f
->
size
,
.
stt
[
0
]
=
{
.
level
=
fobj
->
f
->
stt
[
0
].
level
,
},
},
};
code
=
TARRAY2_APPEND
(
rtner
->
fopArr
,
op
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
// do copy the file
code
=
tsdbCopyFileS3
(
rtner
,
fobj
,
&
op
.
nf
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
rtner
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
typedef
struct
{
STsdb
*
tsdb
;
int32_t
sync
;
...
...
@@ -201,8 +298,14 @@ static int32_t tsdbDoRetention2(void *arg) {
for
(
int32_t
ftype
=
0
;
(
ftype
<
TSDB_FTYPE_MAX
)
&&
(
fobj
=
rtner
->
ctx
->
fset
->
farr
[
ftype
],
1
);
++
ftype
)
{
if
(
fobj
==
NULL
)
continue
;
code
=
tsdbDoRemoveFileObject
(
rtner
,
fobj
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
int32_t
nlevel
=
tfsGetLevel
(
rtner
->
tsdb
->
pVnode
->
pTfs
);
if
(
tsS3Enabled
&&
nlevel
>
1
&&
TSDB_FTYPE_DATA
==
ftype
&&
fobj
->
f
->
did
.
level
==
nlevel
-
1
)
{
code
=
tsdbRemoveFileObjectS3
(
rtner
,
fobj
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDoRemoveFileObject
(
rtner
,
fobj
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
SSttLvl
*
lvl
;
...
...
@@ -228,8 +331,15 @@ static int32_t tsdbDoRetention2(void *arg) {
if
(
fobj
==
NULL
)
continue
;
if
(
fobj
->
f
->
did
.
level
==
did
.
level
)
continue
;
code
=
tsdbDoMigrateFileObj
(
rtner
,
fobj
,
&
did
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
int32_t
nlevel
=
tfsGetLevel
(
rtner
->
tsdb
->
pVnode
->
pTfs
);
if
(
tsS3Enabled
&&
nlevel
>
1
&&
TSDB_FTYPE_DATA
==
ftype
&&
did
.
level
==
nlevel
-
1
)
{
code
=
tsdbMigrateDataFileS3
(
rtner
,
fobj
,
&
did
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
else
{
code
=
tsdbDoMigrateFileObj
(
rtner
,
fobj
,
&
did
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
// stt
...
...
@@ -281,4 +391,4 @@ int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
tsdbFreeRtnArg
(
arg
);
}
return
code
;
}
\ No newline at end of file
}
source/dnode/vnode/src/vnd/vnodeCos.c
0 → 100644
浏览文件 @
88d755be
#define ALLOW_FORBID_FUNC
#include "vndCos.h"
#include "cos_api.h"
#include "cos_http_io.h"
#include "cos_log.h"
extern
char
tsS3Endpoint
[];
extern
char
tsS3AcessKeyId
[];
extern
char
tsS3AcessKeySecret
[];
extern
char
tsS3BucketName
[];
extern
char
tsS3AppId
[];
int32_t
s3Init
()
{
if
(
cos_http_io_initialize
(
NULL
,
0
)
!=
COSE_OK
)
{
return
-
1
;
}
// set log level, default COS_LOG_WARN
cos_log_set_level
(
COS_LOG_WARN
);
// set log output, default stderr
cos_log_set_output
(
NULL
);
return
0
;
}
void
s3CleanUp
()
{
cos_http_io_deinitialize
();
}
static
void
log_status
(
cos_status_t
*
s
)
{
cos_warn_log
(
"status->code: %d"
,
s
->
code
);
if
(
s
->
error_code
)
cos_warn_log
(
"status->error_code: %s"
,
s
->
error_code
);
if
(
s
->
error_msg
)
cos_warn_log
(
"status->error_msg: %s"
,
s
->
error_msg
);
if
(
s
->
req_id
)
cos_warn_log
(
"status->req_id: %s"
,
s
->
req_id
);
}
static
void
s3InitRequestOptions
(
cos_request_options_t
*
options
,
int
is_cname
)
{
options
->
config
=
cos_config_create
(
options
->
pool
);
cos_config_t
*
config
=
options
->
config
;
cos_str_set
(
&
config
->
endpoint
,
tsS3Endpoint
);
cos_str_set
(
&
config
->
access_key_id
,
tsS3AcessKeyId
);
cos_str_set
(
&
config
->
access_key_secret
,
tsS3AcessKeySecret
);
cos_str_set
(
&
config
->
appid
,
tsS3AppId
);
config
->
is_cname
=
is_cname
;
options
->
ctl
=
cos_http_controller_create
(
options
->
pool
,
0
);
}
void
s3PutObjectFromFile
(
const
char
*
file_str
,
const
char
*
object_str
)
{
cos_pool_t
*
p
=
NULL
;
int
is_cname
=
0
;
cos_status_t
*
s
=
NULL
;
cos_request_options_t
*
options
=
NULL
;
cos_string_t
bucket
,
object
,
file
;
cos_table_t
*
resp_headers
;
int
traffic_limit
=
0
;
cos_pool_create
(
&
p
,
NULL
);
options
=
cos_request_options_create
(
p
);
s3InitRequestOptions
(
options
,
is_cname
);
cos_table_t
*
headers
=
NULL
;
if
(
traffic_limit
)
{
// 限速值设置范围为819200 - 838860800,即100KB/s - 100MB/s,如果超出该范围将返回400错误
headers
=
cos_table_make
(
p
,
1
);
cos_table_add_int
(
headers
,
"x-cos-traffic-limit"
,
819200
);
}
cos_str_set
(
&
bucket
,
tsS3BucketName
);
cos_str_set
(
&
file
,
file_str
);
cos_str_set
(
&
object
,
object_str
);
s
=
cos_put_object_from_file
(
options
,
&
bucket
,
&
object
,
&
file
,
headers
,
&
resp_headers
);
log_status
(
s
);
cos_pool_destroy
(
p
);
}
void
s3DeleteObjects
(
const
char
*
object_name
[],
int
nobject
)
{
cos_pool_t
*
p
=
NULL
;
int
is_cname
=
0
;
cos_string_t
bucket
;
cos_table_t
*
resp_headers
=
NULL
;
cos_request_options_t
*
options
=
NULL
;
cos_list_t
object_list
;
cos_list_t
deleted_object_list
;
int
is_quiet
=
COS_TRUE
;
cos_pool_create
(
&
p
,
NULL
);
options
=
cos_request_options_create
(
p
);
s3InitRequestOptions
(
options
,
is_cname
);
cos_str_set
(
&
bucket
,
tsS3BucketName
);
cos_list_init
(
&
object_list
);
cos_list_init
(
&
deleted_object_list
);
for
(
int
i
=
0
;
i
<
nobject
;
++
i
)
{
cos_object_key_t
*
content
=
cos_create_cos_object_key
(
p
);
cos_str_set
(
&
content
->
key
,
object_name
[
i
]);
cos_list_add_tail
(
&
content
->
node
,
&
object_list
);
}
cos_status_t
*
s
=
cos_delete_objects
(
options
,
&
bucket
,
&
object_list
,
is_quiet
,
&
resp_headers
,
&
deleted_object_list
);
log_status
(
s
);
cos_pool_destroy
(
p
);
if
(
cos_status_is_ok
(
s
))
{
cos_warn_log
(
"delete objects succeeded
\n
"
);
}
else
{
cos_warn_log
(
"delete objects failed
\n
"
);
}
}
source/dnode/vnode/src/vnd/vnodeModule.c
浏览文件 @
88d755be
...
...
@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "vndCos.h"
typedef
struct
SVnodeTask
SVnodeTask
;
struct
SVnodeTask
{
...
...
@@ -81,6 +82,9 @@ int vnodeInit(int nthreads) {
if
(
tqInit
()
<
0
)
{
return
-
1
;
}
if
(
s3Init
()
<
0
)
{
return
-
1
;
}
return
0
;
}
...
...
@@ -112,6 +116,7 @@ void vnodeCleanup() {
walCleanUp
();
tqCleanUp
();
smaCleanUp
();
s3CleanUp
();
}
int
vnodeScheduleTaskEx
(
int
tpid
,
int
(
*
execute
)(
void
*
),
void
*
arg
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录