Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
7dae1c57
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
7dae1c57
编写于
11月 01, 2018
作者:
L
Lai Yingchun
提交者:
QinZuoyan
11月 01, 2018
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
server: support default TTL for a table (#202)
上级
541a5d0d
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
356 addition
and
56 deletion
+356
-56
src/base/pegasus_const.cpp
src/base/pegasus_const.cpp
+5
-0
src/base/pegasus_const.h
src/base/pegasus_const.h
+1
-0
src/server/config-server.ini
src/server/config-server.ini
+1
-1
src/server/key_ttl_compaction_filter.h
src/server/key_ttl_compaction_filter.h
+59
-19
src/server/pegasus_server_impl.cpp
src/server/pegasus_server_impl.cpp
+20
-3
src/server/pegasus_server_impl.h
src/server/pegasus_server_impl.h
+3
-1
src/server/pegasus_server_write.cpp
src/server/pegasus_server_write.cpp
+5
-0
src/server/pegasus_server_write.h
src/server/pegasus_server_write.h
+2
-0
src/server/pegasus_write_service.cpp
src/server/pegasus_write_service.cpp
+5
-0
src/server/pegasus_write_service.h
src/server/pegasus_write_service.h
+2
-0
src/server/pegasus_write_service_impl.h
src/server/pegasus_write_service_impl.h
+21
-1
src/test/function_test/main.cpp
src/test/function_test/main.cpp
+7
-0
src/test/function_test/run.sh
src/test/function_test/run.sh
+2
-0
src/test/function_test/test_basic.cpp
src/test/function_test/test_basic.cpp
+0
-31
src/test/function_test/test_ttl.cpp
src/test/function_test/test_ttl.cpp
+223
-0
未找到文件。
src/base/pegasus_const.cpp
浏览文件 @
7dae1c57
...
...
@@ -53,4 +53,9 @@ const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY("bottommost_lev
const
std
::
string
MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE
(
"force"
);
const
std
::
string
MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP
(
"skip"
);
/// default ttl for items in a table. If ttl is not set for
/// * a new writen item, 'default_ttl' will be applied on this item.
/// * an exist item, 'default_ttl' will be applied on this item when it was compacted.
/// <= 0 means no effect
const
std
::
string
TABLE_LEVEL_DEFAULT_TTL
(
"default_ttl"
);
}
// namespace pegasus
src/base/pegasus_const.h
浏览文件 @
7dae1c57
...
...
@@ -36,4 +36,5 @@ extern const std::string MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_KEY;
extern
const
std
::
string
MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_FORCE
;
extern
const
std
::
string
MANUAL_COMPACT_BOTTOMMOST_LEVEL_COMPACTION_SKIP
;
extern
const
std
::
string
TABLE_LEVEL_DEFAULT_TTL
;
}
// namespace pegasus
src/server/config-server.ini
浏览文件 @
7dae1c57
...
...
@@ -256,7 +256,7 @@ max_concurrent_uploading_file_count = 5
rocksdb_verbose_log
=
false
rocksdb_write_buffer_size
=
10485760
updating_rocksdb_sstsize_interval_seconds
=
30
manual_compact_min_interval_seconds
=
3
60
0
manual_compact_min_interval_seconds
=
30
perf_counter_cluster_name
=
onebox
perf_counter_update_interval_seconds
=
10
...
...
src/server/key_ttl_compaction_filter.h
浏览文件 @
7dae1c57
...
...
@@ -18,36 +18,76 @@ namespace server {
class
KeyWithTTLCompactionFilter
:
public
rocksdb
::
CompactionFilter
{
public:
KeyWithTTLCompactionFilter
()
:
_value_schema_version
(
0
),
_enabled
(
false
)
{}
virtual
bool
Filter
(
int
/*level*/
,
const
rocksdb
::
Slice
&
key
,
const
rocksdb
::
Slice
&
existing_value
,
std
::
string
*
new_value
,
bool
*
value_changed
)
const
override
KeyWithTTLCompactionFilter
(
uint32_t
value_schema_version
,
uint32_t
default_ttl
,
bool
enabled
)
:
_value_schema_version
(
value_schema_version
),
_default_ttl
(
default_ttl
),
_enabled
(
enabled
)
{
if
(
!
_enabled
.
load
(
std
::
memory_order_acquire
))
}
bool
Filter
(
int
/*level*/
,
const
rocksdb
::
Slice
&
key
,
const
rocksdb
::
Slice
&
existing_value
,
std
::
string
*
new_value
,
bool
*
value_changed
)
const
override
{
if
(
!
_enabled
)
{
return
false
;
}
uint32_t
expire_ts
=
pegasus_extract_expire_ts
(
_value_schema_version
,
utils
::
to_string_view
(
existing_value
));
if
(
_default_ttl
!=
0
&&
expire_ts
==
0
)
{
// should update ttl
dsn
::
blob
user_data
;
pegasus_extract_user_data
(
_value_schema_version
,
std
::
string
(
existing_value
.
data
(),
existing_value
.
length
()),
user_data
);
rocksdb
::
SliceParts
sparts
=
_gen
.
generate_value
(
_value_schema_version
,
dsn
::
string_view
(
user_data
),
utils
::
epoch_now
()
+
_default_ttl
);
for
(
int
i
=
0
;
i
<
sparts
.
num_parts
;
i
++
)
{
*
new_value
+=
sparts
.
parts
[
i
].
ToString
();
}
*
value_changed
=
true
;
return
false
;
return
check_if_record_expired
(
_value_schema_version
,
utils
::
epoch_now
(),
utils
::
to_string_view
(
existing_value
)
);
}
return
check_if_ts_expired
(
utils
::
epoch_now
(),
expire_ts
);
}
virtual
const
char
*
Name
()
const
override
{
return
"KeyWithTTLCompactionFilter"
;
}
void
SetValueSchemaVersion
(
uint32_t
version
)
{
_value_schema_version
=
version
;
}
void
EnableFilter
()
{
_enabled
.
store
(
true
,
std
::
memory_order_release
);
}
const
char
*
Name
()
const
override
{
return
"KeyWithTTLCompactionFilter"
;
}
private:
uint32_t
_value_schema_version
;
std
::
atomic_bool
_enabled
;
// only process filtering when _enabled == true
uint32_t
_default_ttl
;
bool
_enabled
;
// only process filtering when _enabled == true
mutable
pegasus_value_generator
_gen
;
};
class
KeyWithTTLCompactionFilterFactory
:
public
rocksdb
::
CompactionFilterFactory
{
public:
KeyWithTTLCompactionFilterFactory
()
{}
virtual
std
::
unique_ptr
<
rocksdb
::
CompactionFilter
>
KeyWithTTLCompactionFilterFactory
()
:
_value_schema_version
(
0
),
_default_ttl
(
0
),
_enabled
(
false
)
{
}
std
::
unique_ptr
<
rocksdb
::
CompactionFilter
>
CreateCompactionFilter
(
const
rocksdb
::
CompactionFilter
::
Context
&
/*context*/
)
override
{
return
std
::
unique_ptr
<
KeyWithTTLCompactionFilter
>
(
new
KeyWithTTLCompactionFilter
());
return
std
::
unique_ptr
<
KeyWithTTLCompactionFilter
>
(
new
KeyWithTTLCompactionFilter
(
_value_schema_version
.
load
(),
_default_ttl
.
load
(),
_enabled
.
load
()));
}
virtual
const
char
*
Name
()
const
override
{
return
"KeyWithTTLCompactionFilterFactory"
;
}
const
char
*
Name
()
const
override
{
return
"KeyWithTTLCompactionFilterFactory"
;
}
void
SetValueSchemaVersion
(
uint32_t
version
)
{
_value_schema_version
.
store
(
version
,
std
::
memory_order_release
);
}
void
EnableFilter
()
{
_enabled
.
store
(
true
,
std
::
memory_order_release
);
}
void
SetDefaultTTL
(
uint32_t
ttl
)
{
_default_ttl
.
store
(
ttl
,
std
::
memory_order_release
);
}
private:
std
::
atomic
<
uint32_t
>
_value_schema_version
;
std
::
atomic
<
uint32_t
>
_default_ttl
;
std
::
atomic_bool
_enabled
;
// only process filtering when _enabled == true
};
}
}
// namespace
}
// namespace server
}
// namespace pegasus
src/server/pegasus_server_impl.cpp
浏览文件 @
7dae1c57
...
...
@@ -12,6 +12,7 @@
#include <rocksdb/filter_policy.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
#include <dsn/dist/fmt_logging.h>
#include "base/pegasus_key_schema.h"
...
...
@@ -236,6 +237,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
}
_db_opts
.
table_factory
.
reset
(
NewBlockBasedTableFactory
(
tbl_opts
));
_key_ttl_compaction_filter_factory
=
std
::
make_shared
<
KeyWithTTLCompactionFilterFactory
>
();
_db_opts
.
compaction_filter_factory
=
_key_ttl_compaction_filter_factory
;
_db_opts
.
listeners
.
emplace_back
(
new
pegasus_event_listener
());
...
...
@@ -1375,7 +1378,6 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
rocksdb
::
Options
opts
=
_db_opts
;
opts
.
create_if_missing
=
true
;
opts
.
error_if_exists
=
false
;
opts
.
compaction_filter
=
&
_key_ttl_compaction_filter
;
opts
.
default_value_schema_version
=
PEGASUS_VALUE_SCHEMA_MAX_VERSION
;
// parse envs for parameters
...
...
@@ -1478,8 +1480,8 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
}
// only enable filter after correct value_schema_version set
_key_ttl_compaction_filter
.
SetValueSchemaVersion
(
_value_schema_version
);
_key_ttl_compaction_filter
.
EnableFilter
();
_key_ttl_compaction_filter
_factory
->
SetValueSchemaVersion
(
_value_schema_version
);
_key_ttl_compaction_filter
_factory
->
EnableFilter
();
// update LastManualCompactFinishTime
_manual_compact_svc
.
init_last_finish_time_ms
(
_db
->
GetLastManualCompactFinishTime
());
...
...
@@ -2252,6 +2254,7 @@ pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::s
void
pegasus_server_impl
::
update_app_envs
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
)
{
update_usage_scenario
(
envs
);
update_default_ttl
(
envs
);
_manual_compact_svc
.
start_manual_compact_if_needed
(
envs
);
}
...
...
@@ -2285,6 +2288,20 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
}
}
void
pegasus_server_impl
::
update_default_ttl
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
)
{
auto
find
=
envs
.
find
(
TABLE_LEVEL_DEFAULT_TTL
);
if
(
find
!=
envs
.
end
())
{
int32_t
ttl
=
0
;
if
(
!
dsn
::
buf2int32
(
find
->
second
,
ttl
)
||
ttl
<
0
)
{
derror_replica
(
"{}={} is invalid."
,
find
->
first
,
find
->
second
);
return
;
}
_server_write
->
set_default_ttl
(
static_cast
<
uint32_t
>
(
ttl
));
_key_ttl_compaction_filter_factory
->
SetDefaultTTL
(
static_cast
<
uint32_t
>
(
ttl
));
}
}
bool
pegasus_server_impl
::
set_usage_scenario
(
const
std
::
string
&
usage_scenario
)
{
if
(
usage_scenario
==
_usage_scenario
)
...
...
src/server/pegasus_server_impl.h
浏览文件 @
7dae1c57
...
...
@@ -212,6 +212,8 @@ private:
void
update_usage_scenario
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
);
void
update_default_ttl
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
);
// return finish time recorded in rocksdb
uint64_t
do_manual_compact
(
const
rocksdb
::
CompactRangeOptions
&
options
);
...
...
@@ -247,7 +249,7 @@ private:
uint64_t
_abnormal_multi_get_size_threshold
;
uint64_t
_abnormal_multi_get_iterate_count_threshold
;
KeyWithTTLCompactionFilter
_key_ttl_compaction_filter
;
std
::
shared_ptr
<
KeyWithTTLCompactionFilterFactory
>
_key_ttl_compaction_filter_factory
;
rocksdb
::
Options
_db_opts
;
rocksdb
::
WriteOptions
_wt_opts
;
rocksdb
::
ReadOptions
_rd_opts
;
...
...
src/server/pegasus_server_write.cpp
浏览文件 @
7dae1c57
...
...
@@ -63,6 +63,11 @@ int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
return
on_batched_writes
(
requests
,
count
);
}
void
pegasus_server_write
::
set_default_ttl
(
uint32_t
ttl
)
{
_write_svc
->
set_default_ttl
(
ttl
);
}
int
pegasus_server_write
::
on_batched_writes
(
dsn
::
message_ex
**
requests
,
int
count
)
{
int
err
=
0
;
...
...
src/server/pegasus_server_write.h
浏览文件 @
7dae1c57
...
...
@@ -31,6 +31,8 @@ public:
int64_t
decree
,
uint64_t
timestamp
);
void
set_default_ttl
(
uint32_t
ttl
);
private:
/// Delay replying for the batched requests until all of them complete.
int
on_batched_writes
(
dsn
::
message_ex
**
requests
,
int
count
);
...
...
src/server/pegasus_write_service.cpp
浏览文件 @
7dae1c57
...
...
@@ -202,6 +202,11 @@ void pegasus_write_service::batch_abort(int64_t decree, int err)
clear_up_batch_states
();
}
void
pegasus_write_service
::
set_default_ttl
(
uint32_t
ttl
)
{
_impl
->
set_default_ttl
(
ttl
);
}
void
pegasus_write_service
::
clear_up_batch_states
()
{
uint64_t
latency
=
dsn_now_ns
()
-
_batch_start_time
;
...
...
src/server/pegasus_write_service.h
浏览文件 @
7dae1c57
...
...
@@ -84,6 +84,8 @@ public:
// Abort batch write.
void
batch_abort
(
int64_t
decree
,
int
err
);
void
set_default_ttl
(
uint32_t
ttl
);
private:
void
clear_up_batch_states
();
...
...
src/server/pegasus_write_service_impl.h
浏览文件 @
7dae1c57
...
...
@@ -31,6 +31,7 @@ public:
_db
(
server
->
_db
),
_wt_opts
(
server
->
_wt_opts
),
_rd_opts
(
server
->
_rd_opts
),
_default_ttl
(
0
),
_pfc_recent_expire_count
(
server
->
_pfc_recent_expire_count
)
{
}
...
...
@@ -485,6 +486,14 @@ public:
void
batch_abort
(
int64_t
decree
,
int
err
)
{
clear_up_batch_states
(
decree
,
err
);
}
void
set_default_ttl
(
uint32_t
ttl
)
{
if
(
_default_ttl
!=
ttl
)
{
_default_ttl
=
ttl
;
ddebug_replica
(
"update _default_ttl to {}."
,
ttl
);
}
}
private:
int
db_write_batch_put
(
int64_t
decree
,
dsn
::
string_view
raw_key
,
...
...
@@ -497,7 +506,7 @@ private:
rocksdb
::
Slice
skey
=
utils
::
to_rocksdb_slice
(
raw_key
);
rocksdb
::
SliceParts
skey_parts
(
&
skey
,
1
);
rocksdb
::
SliceParts
svalue
=
_value_generator
.
generate_value
(
_value_schema_version
,
value
,
expire_sec
);
_value_generator
.
generate_value
(
_value_schema_version
,
value
,
db_expire_ts
(
expire_sec
)
);
rocksdb
::
Status
s
=
_batch
.
Put
(
skey_parts
,
svalue
);
if
(
dsn_unlikely
(
!
s
.
ok
()))
{
::
dsn
::
blob
hash_key
,
sort_key
;
...
...
@@ -679,6 +688,16 @@ private:
return
false
;
}
uint32_t
db_expire_ts
(
uint32_t
expire_ts
)
{
// use '_default_ttl' when ttl is not set for this write operation.
if
(
_default_ttl
!=
0
&&
expire_ts
==
0
)
{
return
utils
::
epoch_now
()
+
_default_ttl
;
}
return
expire_ts
;
}
private:
friend
class
pegasus_write_service_test
;
friend
class
pegasus_server_write_test
;
...
...
@@ -690,6 +709,7 @@ private:
rocksdb
::
DB
*
_db
;
rocksdb
::
WriteOptions
&
_wt_opts
;
rocksdb
::
ReadOptions
&
_rd_opts
;
volatile
uint32_t
_default_ttl
;
::
dsn
::
perf_counter_wrapper
&
_pfc_recent_expire_count
;
pegasus_value_generator
_value_generator
;
...
...
src/test/function_test/main.cpp
浏览文件 @
7dae1c57
...
...
@@ -7,14 +7,18 @@
#include <vector>
#include <map>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
using
namespace
::
dsn
;
using
namespace
::
replication
;
using
namespace
::
pegasus
;
pegasus_client
*
client
=
nullptr
;
std
::
shared_ptr
<
replication_ddl_client
>
ddl_client
;
GTEST_API_
int
main
(
int
argc
,
char
**
argv
)
{
...
...
@@ -31,6 +35,9 @@ GTEST_API_ int main(int argc, char **argv)
const
char
*
app_name
=
argv
[
2
];
client
=
pegasus_client_factory
::
get_client
(
"mycluster"
,
app_name
);
std
::
vector
<
rpc_address
>
meta_list
;
replica_helper
::
load_meta_servers
(
meta_list
,
"uri-resolver.dsn://mycluster"
,
"arguments"
);
ddl_client
=
std
::
make_shared
<
replication_ddl_client
>
(
meta_list
);
ddebug
(
"MainThread: app_name=%s"
,
app_name
);
int
gargc
=
argc
-
2
;
...
...
src/test/function_test/run.sh
浏览文件 @
7dae1c57
...
...
@@ -24,6 +24,8 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/check_and_mutate.xml"
GTEST_FILTER
=
"check_and_mutate.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test check_and_mutate failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/scan.xml"
GTEST_FILTER
=
"scan.*"
./
$test_case
$config_file
$table_name
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/ttl.xml"
GTEST_FILTER
=
"ttl.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test ttl failed:
$test_case
$config_file
$table_name
"
exit_if_fail
$?
"run test scan failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/slog_log.xml"
GTEST_FILTER
=
"lost_log.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test slog_lost failed:
$test_case
$config_file
$table_name
"
...
...
src/test/function_test/test_basic.cpp
浏览文件 @
7dae1c57
...
...
@@ -1366,37 +1366,6 @@ TEST(basic, set_get_del_async)
while
(
!
callbacked
.
load
(
std
::
memory_order_seq_cst
))
usleep
(
100
);
// set and ttl
{
dsn
::
utils
::
notify_event
evt
;
client
->
async_set
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
,
"basic_test_value_1"
,
[
&
](
int
err
,
internal_info
&&
info
)
{
ASSERT_EQ
(
PERR_OK
,
err
);
evt
.
notify
();
},
10
,
3
);
evt
.
wait
();
std
::
string
value
;
int
result
=
client
->
get
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
,
value
);
ASSERT_EQ
(
PERR_OK
,
result
);
ASSERT_EQ
(
"basic_test_value_1"
,
value
);
int
ttl_seconds
;
result
=
client
->
ttl
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
result
);
ASSERT_TRUE
(
ttl_seconds
>
0
&&
ttl_seconds
<=
3
)
<<
"ttl is "
<<
ttl_seconds
;
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
4
));
result
=
client
->
ttl
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
,
ttl_seconds
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
result
);
result
=
client
->
get
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
,
value
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
result
);
}
// exist
ret
=
client
->
exist
(
"basic_test_hash_key_1"
,
"basic_test_sort_key_1"
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
...
...
src/test/function_test/test_ttl.cpp
0 → 100644
浏览文件 @
7dae1c57
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <thread>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"
using
namespace
::
dsn
;
using
namespace
::
pegasus
;
extern
pegasus_client
*
client
;
extern
std
::
shared_ptr
<
replication
::
replication_ddl_client
>
ddl_client
;
std
::
string
ttl_hash_key
=
"ttl_test_hash_key"
;
std
::
string
ttl_test_sort_key_0
=
"ttl_test_sort_key_0"
;
std
::
string
ttl_test_sort_key_1
=
"ttl_test_sort_key_1"
;
std
::
string
ttl_test_sort_key_2
=
"ttl_test_sort_key_2"
;
std
::
string
ttl_test_value_0
=
"ttl_test_value_0"
;
std
::
string
ttl_test_value_1
=
"ttl_test_value_1"
;
std
::
string
ttl_test_value_2
=
"ttl_test_value_2"
;
int
default_ttl
=
3600
;
int
specify_ttl
=
5
;
int
sleep_for_expiring
=
10
;
int
sleep_for_envs_effect
=
65
;
int
error_allow
=
2
;
int
timeout
=
5
;
void
set_default_ttl
(
int
ttl
)
{
std
::
map
<
std
::
string
,
std
::
string
>
envs
;
ddl_client
->
get_app_envs
(
client
->
get_app_name
(),
envs
);
std
::
string
env
=
envs
[
TABLE_LEVEL_DEFAULT_TTL
];
if
((
env
.
empty
()
&&
ttl
!=
0
)
||
env
!=
std
::
to_string
(
ttl
))
{
dsn
::
error_code
ec
=
ddl_client
->
set_app_envs
(
client
->
get_app_name
(),
{
TABLE_LEVEL_DEFAULT_TTL
},
{
std
::
to_string
(
ttl
)});
ASSERT_EQ
(
ERR_OK
,
ec
);
// wait envs to be synced.
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
sleep_for_envs_effect
));
}
}
TEST
(
ttl
,
set_without_default_ttl
)
{
// unset default_ttl
set_default_ttl
(
0
);
// set with ttl
int
ret
=
client
->
set
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_test_value_1
,
timeout
,
specify_ttl
);
ASSERT_EQ
(
PERR_OK
,
ret
);
std
::
string
value
;
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_1
,
value
);
int
ttl_seconds
;
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
specify_ttl
-
error_allow
&&
ttl_seconds
<=
specify_ttl
)
<<
"ttl is "
<<
ttl_seconds
;
// set without ttl
ret
=
client
->
set
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_test_value_2
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_2
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_2
,
value
);
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_seconds
,
-
1
)
<<
"ttl is "
<<
ttl_seconds
;
// sleep a while
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
sleep_for_expiring
));
// check expired one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
// check exist one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_seconds
,
-
1
)
<<
"ttl is "
<<
ttl_seconds
;
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_2
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_2
,
value
);
// trigger a manual compaction
dsn
::
error_code
ec
=
ddl_client
->
set_app_envs
(
client
->
get_app_name
(),
{
MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY
},
{
std
::
to_string
(
time
(
nullptr
))});
ASSERT_EQ
(
ERR_OK
,
ec
);
// wait envs to be synced, and manual lcompaction has been finished.
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
sleep_for_envs_effect
));
// check expired one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
// check exist one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_seconds
,
-
1
)
<<
"ttl is "
<<
ttl_seconds
;
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_2
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_2
,
value
);
}
TEST
(
ttl
,
set_with_default_ttl
)
{
// unset default_ttl
set_default_ttl
(
0
);
// set without ttl
int
ret
=
client
->
set
(
ttl_hash_key
,
ttl_test_sort_key_0
,
ttl_test_value_0
);
ASSERT_EQ
(
PERR_OK
,
ret
);
// set default_ttl
set_default_ttl
(
default_ttl
);
// set with ttl
ret
=
client
->
set
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_test_value_1
,
timeout
,
specify_ttl
);
ASSERT_EQ
(
PERR_OK
,
ret
);
std
::
string
value
;
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_1
,
value
);
int
ttl_seconds
;
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
specify_ttl
-
error_allow
&&
ttl_seconds
<=
specify_ttl
)
<<
"ttl is "
<<
ttl_seconds
;
// set without ttl
ret
=
client
->
set
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_test_value_2
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_2
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_2
,
value
);
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
default_ttl
-
error_allow
&&
ttl_seconds
<=
default_ttl
)
<<
"ttl is "
<<
ttl_seconds
;
// sleep a while
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
sleep_for_expiring
));
// check forever one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_0
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_seconds
,
-
1
)
<<
"ttl is "
<<
ttl_seconds
;
// check expired one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
// check exist one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
default_ttl
-
sleep_for_expiring
-
error_allow
&&
ttl_seconds
<=
default_ttl
-
sleep_for_expiring
)
<<
"ttl is "
<<
ttl_seconds
;
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_2
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_EQ
(
ttl_test_value_2
,
value
);
// trigger a manual compaction
dsn
::
error_code
ec
=
ddl_client
->
set_app_envs
(
client
->
get_app_name
(),
{
MANUAL_COMPACT_ONCE_TRIGGER_TIME_KEY
},
{
std
::
to_string
(
time
(
nullptr
))});
ASSERT_EQ
(
ERR_OK
,
ec
);
// wait envs to be synced, and manual compaction has been finished.
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
sleep_for_envs_effect
));
// check forever one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_0
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
default_ttl
-
sleep_for_envs_effect
-
error_allow
&&
ttl_seconds
<=
default_ttl
-
error_allow
)
<<
"ttl is "
<<
ttl_seconds
;
// check expired one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_1
,
ttl_seconds
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
ret
=
client
->
get
(
ttl_hash_key
,
ttl_test_sort_key_1
,
value
);
ASSERT_EQ
(
PERR_NOT_FOUND
,
ret
);
// check exist one
ret
=
client
->
ttl
(
ttl_hash_key
,
ttl_test_sort_key_2
,
ttl_seconds
);
ASSERT_EQ
(
PERR_OK
,
ret
);
ASSERT_TRUE
(
ttl_seconds
>
default_ttl
-
sleep_for_expiring
-
sleep_for_envs_effect
-
error_allow
&&
ttl_seconds
<=
default_ttl
-
sleep_for_expiring
-
sleep_for_envs_effect
)
<<
"ttl is "
<<
ttl_seconds
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录