Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Incubator Pegasus
提交
46486fa8
Incubator Pegasus
项目概览
apache
/
Incubator Pegasus
通知
9
Star
5
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Incubator Pegasus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
46486fa8
编写于
4月 08, 2020
作者:
小地鼠家的小松鼠
提交者:
neverchanje
4月 10, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: limit long time rocksdb iteration operation (#500)
上级
f9d3456d
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
324 addition
and
22 deletion
+324
-22
rdsn
rdsn
+1
-1
src/base/pegasus_const.cpp
src/base/pegasus_const.cpp
+4
-0
src/base/pegasus_const.h
src/base/pegasus_const.h
+2
-0
src/server/config.ini
src/server/config.ini
+6
-0
src/server/pegasus_server_impl.cpp
src/server/pegasus_server_impl.cpp
+160
-20
src/server/pegasus_server_impl.h
src/server/pegasus_server_impl.h
+5
-0
src/server/range_read_limiter.h
src/server/range_read_limiter.h
+91
-0
src/shell/commands/data_operations.cpp
src/shell/commands/data_operations.cpp
+2
-0
src/test/function_test/run.sh
src/test/function_test/run.sh
+1
-1
src/test/function_test/test_basic.cpp
src/test/function_test/test_basic.cpp
+10
-0
src/test/function_test/test_scan.cpp
src/test/function_test/test_scan.cpp
+42
-0
未找到文件。
rdsn
@
de9bb3b8
比较
d1cb30c6
...
de9bb3b8
Subproject commit d
1cb30c6b0a3b9f08cf2501e8f193c7be4e87965
Subproject commit d
e9bb3b8607d2c3f5b542e6d1f7d0579c95edb79
src/base/pegasus_const.cpp
浏览文件 @
46486fa8
...
...
@@ -69,4 +69,8 @@ const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");
/// table level slow query
const
std
::
string
ROCKSDB_ENV_SLOW_QUERY_THRESHOLD
(
"replica.slow_query_threshold"
);
/// time threshold of each rocksdb iteration
const
std
::
string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
(
"replica.rocksdb_iteration_threshold_time_ms"
);
}
// namespace pegasus
src/base/pegasus_const.h
浏览文件 @
46486fa8
...
...
@@ -45,4 +45,6 @@ extern const std::string ROCKDB_CHECKPOINT_RESERVE_TIME_SECONDS;
extern
const
std
::
string
PEGASUS_CLUSTER_SECTION_NAME
;
extern
const
std
::
string
ROCKSDB_ENV_SLOW_QUERY_THRESHOLD
;
extern
const
std
::
string
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
;
}
// namespace pegasus
src/server/config.ini
浏览文件 @
46486fa8
...
...
@@ -280,6 +280,12 @@
# Bloom filter type, should be either 'common' or 'prefix'
rocksdb_filter_type
=
prefix
# 3000, 30MB, 1000, 30s
rocksdb_multi_get_max_iteration_count
=
3000
rocksdb_multi_get_max_iteration_size
=
31457280
rocksdb_max_iteration_count
=
1000
rocksdb_iteration_threshold_time_ms
=
30000
checkpoint_reserve_min_count
=
2
checkpoint_reserve_time_seconds
=
1800
...
...
src/server/pegasus_server_impl.cpp
浏览文件 @
46486fa8
...
...
@@ -90,6 +90,38 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
1000
,
"multi-get operation iterate count exceed this threshold will be logged, 0 means no check"
);
_rng_rd_opts
.
multi_get_max_iteration_count
=
(
uint32_t
)
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_multi_get_max_iteration_count"
,
3000
,
"max iteration count for each range read for multi-get operation, if "
"exceed this threshold,"
"iterator will be stopped"
);
_rng_rd_opts
.
multi_get_max_iteration_size
=
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_multi_get_max_iteration_size"
,
30
<<
20
,
"multi-get operation total key-value size exceed "
"this threshold will stop iterating rocksdb, 0 means no check"
);
_rng_rd_opts
.
rocksdb_max_iteration_count
=
(
uint32_t
)
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_max_iteration_count"
,
1000
,
"max iteration count for each range "
"read, if exceed this threshold, "
"iterator will be stopped"
);
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms_in_config
=
dsn_config_get_value_uint64
(
"pegasus.server"
,
"rocksdb_iteration_threshold_time_ms"
,
30000
,
"max duration for handling one pegasus scan request(sortkey_count/multiget/scan) if exceed "
"this threshold, iterator will be stopped, 0 means no check"
);
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
=
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms_in_config
;
// init rocksdb::DBOptions
_db_opts
.
pegasus_data
=
true
;
_db_opts
.
create_if_missing
=
true
;
...
...
@@ -674,12 +706,20 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
return
;
}
int32_t
max_kv_count
=
request
.
max_kv_count
>
0
?
request
.
max_kv_count
:
INT_MAX
;
uint32_t
max_kv_count
=
request
.
max_kv_count
>
0
?
request
.
max_kv_count
:
INT_MAX
;
uint32_t
max_iteration_count
=
std
::
min
(
max_kv_count
,
_rng_rd_opts
.
multi_get_max_iteration_count
);
int32_t
max_kv_size
=
request
.
max_kv_size
>
0
?
request
.
max_kv_size
:
INT_MAX
;
int32_t
max_iteration_size_config
=
_rng_rd_opts
.
multi_get_max_iteration_size
>
0
?
_rng_rd_opts
.
multi_get_max_iteration_size
:
INT_MAX
;
int32_t
max_iteration_size
=
std
::
min
(
max_kv_size
,
max_iteration_size_config
);
uint32_t
epoch_now
=
::
pegasus
::
utils
::
epoch_now
();
int32_t
count
=
0
;
int64_t
size
=
0
;
int32_t
iterat
e
_count
=
0
;
int32_t
iterat
ion
_count
=
0
;
int32_t
expire_count
=
0
;
int32_t
filter_count
=
0
;
...
...
@@ -754,11 +794,17 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
std
::
unique_ptr
<
rocksdb
::
Iterator
>
it
;
bool
complete
=
false
;
std
::
unique_ptr
<
range_read_limiter
>
limiter
=
dsn
::
make_unique
<
range_read_limiter
>
(
max_iteration_count
,
max_iteration_size
,
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
);
if
(
!
request
.
reverse
)
{
it
.
reset
(
_db
->
NewIterator
(
_data_cf_rd_opts
));
it
->
Seek
(
start
);
bool
first_exclusive
=
!
start_inclusive
;
while
(
count
<
max_kv_count
&&
size
<
max_kv_size
&&
it
->
Valid
())
{
while
(
limiter
->
valid
()
&&
it
->
Valid
())
{
// check stop sort key
int
c
=
it
->
key
().
compare
(
stop
);
if
(
c
>
0
||
(
c
==
0
&&
!
stop_inclusive
))
{
...
...
@@ -777,7 +823,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
}
}
iterate_count
++
;
limiter
->
add_count
()
;
// extract value
int
r
=
append_key_value_for_multi_get
(
resp
.
kvs
,
...
...
@@ -790,7 +836,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
if
(
r
==
1
)
{
count
++
;
auto
&
kv
=
resp
.
kvs
.
back
();
size
+=
kv
.
key
.
length
()
+
kv
.
value
.
length
(
);
limiter
->
add_size
(
kv
.
key
.
length
()
+
kv
.
value
.
length
()
);
}
else
if
(
r
==
2
)
{
expire_count
++
;
}
else
{
// r == 3
...
...
@@ -821,7 +867,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
it
->
SeekForPrev
(
stop
);
bool
first_exclusive
=
!
stop_inclusive
;
std
::
vector
<::
dsn
::
apps
::
key_value
>
reverse_kvs
;
while
(
count
<
max_kv_count
&&
size
<
max_kv_size
&&
it
->
Valid
())
{
while
(
limiter
->
valid
()
&&
it
->
Valid
())
{
// check start sort key
int
c
=
it
->
key
().
compare
(
start
);
if
(
c
<
0
||
(
c
==
0
&&
!
start_inclusive
))
{
...
...
@@ -840,7 +886,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
}
}
iterate_count
++
;
limiter
->
add_count
()
;
// extract value
int
r
=
append_key_value_for_multi_get
(
reverse_kvs
,
...
...
@@ -853,7 +899,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
if
(
r
==
1
)
{
count
++
;
auto
&
kv
=
reverse_kvs
.
back
();
size
+=
kv
.
key
.
length
()
+
kv
.
value
.
length
(
);
limiter
->
add_size
(
kv
.
key
.
length
()
+
kv
.
value
.
length
()
);
}
else
if
(
r
==
2
)
{
expire_count
++
;
}
else
{
// r == 3
...
...
@@ -878,6 +924,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
}
}
iteration_count
=
limiter
->
get_iteration_count
();
resp
.
error
=
it
->
status
().
code
();
if
(
!
it
->
status
().
ok
())
{
// error occur
...
...
@@ -901,8 +948,15 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
}
else
if
(
it
->
Valid
()
&&
!
complete
)
{
// scan not completed
resp
.
error
=
rocksdb
::
Status
::
kIncomplete
;
if
(
limiter
->
exceed_limit
())
{
dwarn_replica
(
"rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)"
,
reply
.
to_address
().
to_string
(),
limiter
->
duration_time
(),
limiter
->
max_duration_time
());
}
}
}
else
{
}
else
{
// condition: !request.sort_keys.empty()
bool
error_occurred
=
false
;
rocksdb
::
Status
final_status
;
bool
exceed_limit
=
false
;
...
...
@@ -992,13 +1046,13 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
#endif
uint64_t
time_used
=
dsn_now_ns
()
-
start_time
;
if
(
is_multi_get_abnormal
(
time_used
,
size
,
iterat
e
_count
))
{
if
(
is_multi_get_abnormal
(
time_used
,
size
,
iterat
ion
_count
))
{
dwarn_replica
(
"rocksdb abnormal multi_get from {}: hash_key = {}, "
"start_sort_key = {} ({}), stop_sort_key = {} ({}), "
"sort_key_filter_type = {}, sort_key_filter_pattern = {}, "
"max_kv_count = {}, max_kv_size = {}, reverse = {}, "
"result_count = {}, result_size = {}, iterat
e
_count = {}, "
"result_count = {}, result_size = {}, iterat
ion
_count = {}, "
"expire_count = {}, filter_count = {}, time_used = {} ns"
,
reply
.
to_address
().
to_string
(),
::
pegasus
::
utils
::
c_escape_string
(
request
.
hash_key
),
...
...
@@ -1013,7 +1067,7 @@ void pegasus_server_impl::on_multi_get(const ::dsn::apps::multi_get_request &req
request
.
reverse
?
"true"
:
"false"
,
count
,
size
,
iterat
e
_count
,
iterat
ion
_count
,
expire_count
,
filter_count
,
time_used
);
...
...
@@ -1038,6 +1092,9 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
{
dassert
(
_is_open
,
""
);
_pfc_scan_qps
->
increment
();
uint64_t
start_time
=
dsn_now_ns
();
::
dsn
::
apps
::
count_response
resp
;
resp
.
app_id
=
_gpid
.
get_app_id
();
resp
.
partition_index
=
_gpid
.
get_partition_index
();
...
...
@@ -1056,7 +1113,14 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
resp
.
count
=
0
;
uint32_t
epoch_now
=
::
pegasus
::
utils
::
epoch_now
();
uint64_t
expire_count
=
0
;
while
(
it
->
Valid
())
{
std
::
unique_ptr
<
range_read_limiter
>
limiter
=
dsn
::
make_unique
<
range_read_limiter
>
(
_rng_rd_opts
.
rocksdb_max_iteration_count
,
0
,
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
);
while
(
limiter
->
time_check
()
&&
it
->
Valid
())
{
limiter
->
add_count
();
if
(
check_if_record_expired
(
epoch_now
,
it
->
value
()))
{
expire_count
++
;
if
(
_verbose_log
)
{
...
...
@@ -1090,9 +1154,16 @@ void pegasus_server_impl::on_sortkey_count(const ::dsn::blob &hash_key,
it
->
status
().
ToString
().
c_str
());
}
resp
.
count
=
0
;
}
else
if
(
limiter
->
exceed_limit
())
{
dwarn_replica
(
"rocksdb abnormal scan from {}: time_used({}ns) VS time_threshold({}ns)"
,
reply
.
to_address
().
to_string
(),
limiter
->
duration_time
(),
limiter
->
max_duration_time
());
resp
.
count
=
-
1
;
}
_cu_calculator
->
add_sortkey_count_cu
(
resp
.
error
);
_pfc_scan_latency
->
set
(
dsn_now_ns
()
-
start_time
);
reply
(
resp
);
}
...
...
@@ -1263,8 +1334,15 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
uint64_t
expire_count
=
0
;
uint64_t
filter_count
=
0
;
int32_t
count
=
0
;
resp
.
kvs
.
reserve
(
request
.
batch_size
);
while
(
count
<
request
.
batch_size
&&
it
->
Valid
())
{
uint32_t
request_batch_size
=
request
.
batch_size
>
0
?
request
.
batch_size
:
INT_MAX
;
uint32_t
batch_count
=
std
::
min
(
request_batch_size
,
_rng_rd_opts
.
rocksdb_max_iteration_count
);
resp
.
kvs
.
reserve
(
batch_count
);
std
::
unique_ptr
<
range_read_limiter
>
limiter
=
dsn
::
make_unique
<
range_read_limiter
>
(
batch_count
,
0
,
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
);
while
(
limiter
->
valid
()
&&
it
->
Valid
())
{
int
c
=
it
->
key
().
compare
(
stop
);
if
(
c
>
0
||
(
c
==
0
&&
!
stop_inclusive
))
{
// out of range
...
...
@@ -1281,6 +1359,8 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
}
}
limiter
->
add_count
();
int
r
=
append_key_value_for_scan
(
resp
.
kvs
,
it
->
key
(),
it
->
value
(),
...
...
@@ -1307,6 +1387,11 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
it
->
Next
();
}
// check iteration time whether exceed limit
if
(
!
complete
)
{
limiter
->
time_check_after_incomplete_scan
();
}
resp
.
error
=
it
->
status
().
code
();
if
(
!
it
->
status
().
ok
())
{
// error occur
...
...
@@ -1320,7 +1405,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
request
.
start_inclusive
?
"inclusive"
:
"exclusive"
,
::
pegasus
::
utils
::
c_escape_string
(
stop
).
c_str
(),
request
.
stop_inclusive
?
"inclusive"
:
"exclusive"
,
request
.
batch_size
,
batch_count
,
count
,
it
->
status
().
ToString
().
c_str
());
}
else
{
...
...
@@ -1330,6 +1415,15 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
it
->
status
().
ToString
().
c_str
());
}
resp
.
kvs
.
clear
();
}
else
if
(
limiter
->
exceed_limit
())
{
// scan exceed limit time
resp
.
error
=
rocksdb
::
Status
::
kIncomplete
;
dwarn_replica
(
"rocksdb abnormal scan from {}: batch_count={}, time_used_ns({}) VS "
"time_threshold_ns({})"
,
reply
.
to_address
().
to_string
(),
batch_count
,
limiter
->
duration_time
(),
limiter
->
max_duration_time
());
}
else
if
(
it
->
Valid
()
&&
!
complete
)
{
// scan not completed
std
::
unique_ptr
<
pegasus_scan_context
>
context
(
...
...
@@ -1342,7 +1436,7 @@ void pegasus_server_impl::on_get_scanner(const ::dsn::apps::get_scanner_request
request
.
sort_key_filter_type
,
std
::
string
(
request
.
sort_key_filter_pattern
.
data
(),
request
.
sort_key_filter_pattern
.
length
()),
request
.
batch_size
,
batch_count
,
request
.
no_value
));
int64_t
handle
=
_context_cache
.
put
(
std
::
move
(
context
));
resp
.
context_id
=
handle
;
...
...
@@ -1387,7 +1481,6 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
std
::
unique_ptr
<
pegasus_scan_context
>
context
=
_context_cache
.
fetch
(
request
.
context_id
);
if
(
context
)
{
rocksdb
::
Iterator
*
it
=
context
->
iterator
.
get
();
int32_t
batch_size
=
context
->
batch_size
;
const
rocksdb
::
Slice
&
stop
=
context
->
stop
;
bool
stop_inclusive
=
context
->
stop_inclusive
;
::
dsn
::
apps
::
filter_type
::
type
hash_key_filter_type
=
context
->
hash_key_filter_type
;
...
...
@@ -1401,7 +1494,14 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
uint64_t
filter_count
=
0
;
int32_t
count
=
0
;
while
(
count
<
batch_size
&&
it
->
Valid
())
{
uint32_t
context_batch_size
=
context
->
batch_size
>
0
?
context
->
batch_size
:
INT_MAX
;
uint32_t
batch_count
=
std
::
min
(
context_batch_size
,
_rng_rd_opts
.
rocksdb_max_iteration_count
);
std
::
unique_ptr
<
range_read_limiter
>
limiter
=
dsn
::
make_unique
<
range_read_limiter
>
(
batch_count
,
0
,
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
);
while
(
limiter
->
valid
()
&&
it
->
Valid
())
{
int
c
=
it
->
key
().
compare
(
stop
);
if
(
c
>
0
||
(
c
==
0
&&
!
stop_inclusive
))
{
// out of range
...
...
@@ -1409,6 +1509,8 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
break
;
}
limiter
->
add_count
();
int
r
=
append_key_value_for_scan
(
resp
.
kvs
,
it
->
key
(),
it
->
value
(),
...
...
@@ -1435,6 +1537,11 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
it
->
Next
();
}
// check iteration time whether exceed limit
if
(
!
complete
)
{
limiter
->
time_check_after_incomplete_scan
();
}
resp
.
error
=
it
->
status
().
code
();
if
(
!
it
->
status
().
ok
())
{
// error occur
...
...
@@ -1447,7 +1554,7 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
request
.
context_id
,
::
pegasus
::
utils
::
c_escape_string
(
stop
).
c_str
(),
stop_inclusive
?
"inclusive"
:
"exclusive"
,
batch_
size
,
batch_
count
,
count
,
it
->
status
().
ToString
().
c_str
());
}
else
{
...
...
@@ -1457,6 +1564,15 @@ void pegasus_server_impl::on_scan(const ::dsn::apps::scan_request &request,
it
->
status
().
ToString
().
c_str
());
}
resp
.
kvs
.
clear
();
}
else
if
(
limiter
->
exceed_limit
())
{
// scan exceed limit time
resp
.
error
=
rocksdb
::
Status
::
kIncomplete
;
dwarn_replica
(
"rocksdb abnormal scan from {}: batch_count={}, time_used({}ns) VS "
"time_threshold({}ns)"
,
reply
.
to_address
().
to_string
(),
batch_count
,
limiter
->
duration_time
(),
limiter
->
max_duration_time
());
}
else
if
(
it
->
Valid
()
&&
!
complete
)
{
// scan not completed
int64_t
handle
=
_context_cache
.
put
(
std
::
move
(
context
));
...
...
@@ -2360,6 +2476,7 @@ void pegasus_server_impl::update_app_envs(const std::map<std::string, std::strin
update_default_ttl
(
envs
);
update_checkpoint_reserve
(
envs
);
update_slow_query_threshold
(
envs
);
update_rocksdb_iteration_threshold
(
envs
);
_manual_compact_svc
.
start_manual_compact_if_needed
(
envs
);
}
...
...
@@ -2370,6 +2487,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_default_ttl
(
envs
);
update_checkpoint_reserve
(
envs
);
update_slow_query_threshold
(
envs
);
update_rocksdb_iteration_threshold
(
envs
);
_manual_compact_svc
.
start_manual_compact_if_needed
(
envs
);
}
...
...
@@ -2476,6 +2594,28 @@ void pegasus_server_impl::update_slow_query_threshold(
}
}
void
pegasus_server_impl
::
update_rocksdb_iteration_threshold
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
)
{
uint64_t
threshold_ms
=
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms_in_config
;
auto
find
=
envs
.
find
(
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
);
if
(
find
!=
envs
.
end
())
{
// the unit of iteration threshold from env is ms
if
(
!
dsn
::
buf2uint64
(
find
->
second
,
threshold_ms
)
||
threshold_ms
<
0
)
{
derror_replica
(
"{}={} is invalid."
,
find
->
first
,
find
->
second
);
return
;
}
}
if
(
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
!=
threshold_ms
)
{
ddebug_replica
(
"update app env[{}] from
\"
{}
\"
to
\"
{}
\"
succeed"
,
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
,
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
,
threshold_ms
);
_rng_rd_opts
.
rocksdb_iteration_threshold_time_ms
=
threshold_ms
;
}
}
bool
pegasus_server_impl
::
parse_compression_types
(
const
std
::
string
&
config
,
std
::
vector
<
rocksdb
::
CompressionType
>
&
compression_per_level
)
{
...
...
src/server/pegasus_server_impl.h
浏览文件 @
46486fa8
...
...
@@ -18,6 +18,7 @@
#include "pegasus_scan_context.h"
#include "pegasus_manual_compact_service.h"
#include "pegasus_write_service.h"
#include "range_read_limiter.h"
namespace
pegasus
{
namespace
server
{
...
...
@@ -229,6 +230,8 @@ private:
void
update_slow_query_threshold
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
);
void
update_rocksdb_iteration_threshold
(
const
std
::
map
<
std
::
string
,
std
::
string
>
&
envs
);
// return true if parse compression types 'config' success, otherwise return false.
// 'compression_per_level' will not be changed if parse failed.
bool
parse_compression_types
(
const
std
::
string
&
config
,
...
...
@@ -308,6 +311,8 @@ private:
uint64_t
_slow_query_threshold_ns
;
uint64_t
_slow_query_threshold_ns_in_config
;
range_read_limiter_options
_rng_rd_opts
;
std
::
shared_ptr
<
KeyWithTTLCompactionFilterFactory
>
_key_ttl_compaction_filter_factory
;
std
::
shared_ptr
<
rocksdb
::
Statistics
>
_statistics
;
rocksdb
::
DBOptions
_db_opts
;
...
...
src/server/range_read_limiter.h
0 → 100644
浏览文件 @
46486fa8
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/dist/replication/replication.codes.h>
namespace
pegasus
{
namespace
server
{
class
pegasus_server_impl
;
struct
range_read_limiter_options
{
uint32_t
multi_get_max_iteration_count
;
uint64_t
multi_get_max_iteration_size
;
uint32_t
rocksdb_max_iteration_count
;
uint64_t
rocksdb_iteration_threshold_time_ms_in_config
;
uint64_t
rocksdb_iteration_threshold_time_ms
;
};
class
range_read_limiter
{
public:
range_read_limiter
(
uint32_t
max_iteration_count
,
uint64_t
max_iteration_size
,
uint64_t
threshold_time_ms
)
:
_max_count
(
max_iteration_count
),
_max_size
(
max_iteration_size
)
{
_module_num
=
_max_count
<=
10
?
1
:
_max_count
/
10
;
_max_duration_time
=
threshold_time_ms
>
0
?
threshold_time_ms
*
1e6
:
0
;
_iteration_start_time_ns
=
dsn_now_ns
();
}
bool
valid
()
{
if
(
_iteration_count
>=
_max_count
)
{
return
false
;
}
if
(
_max_size
>
0
&&
_iteration_size
>=
_max_size
)
{
return
false
;
}
return
time_check
();
}
// during rocksdb iteration, if iteration_count % module_num == 0, we will check if iteration
// exceed time threshold, which means we at most check ten times during iteration
bool
time_check
()
{
if
(
_max_duration_time
>
0
&&
(
_iteration_count
+
1
)
%
_module_num
==
0
&&
dsn_now_ns
()
-
_iteration_start_time_ns
>
_max_duration_time
)
{
_exceed_limit
=
true
;
_iteration_duration_time_ns
=
dsn_now_ns
()
-
_iteration_start_time_ns
;
return
false
;
}
return
true
;
}
void
time_check_after_incomplete_scan
()
{
if
(
_max_duration_time
>
0
&&
dsn_now_ns
()
-
_iteration_start_time_ns
>
_max_duration_time
)
{
_exceed_limit
=
true
;
_iteration_duration_time_ns
=
dsn_now_ns
()
-
_iteration_start_time_ns
;
}
}
void
add_count
()
{
++
_iteration_count
;
}
void
add_size
(
uint64_t
size
)
{
_iteration_size
+=
size
;
}
bool
exceed_limit
()
{
return
_exceed_limit
;
}
uint32_t
get_iteration_count
()
{
return
_iteration_count
;
}
uint64_t
duration_time
()
{
return
_iteration_duration_time_ns
;
}
uint64_t
max_duration_time
()
{
return
_max_duration_time
;
}
private:
bool
_exceed_limit
{
false
};
uint32_t
_iteration_count
{
0
};
uint64_t
_iteration_size
{
0
};
uint64_t
_iteration_start_time_ns
{
0
};
uint64_t
_iteration_duration_time_ns
{
0
};
uint32_t
_max_count
{
0
};
uint64_t
_max_size
{
0
};
uint64_t
_max_duration_time
{
0
};
int32_t
_module_num
{
1
};
};
}
// namespace server
}
// namespace pegasus
src/shell/commands/data_operations.cpp
浏览文件 @
46486fa8
...
...
@@ -1013,6 +1013,8 @@ bool sortkey_count(command_executor *e, shell_context *sc, arguments args)
int
ret
=
sc
->
pg_client
->
sortkey_count
(
hash_key
,
count
,
sc
->
timeout_ms
,
&
info
);
if
(
ret
!=
pegasus
::
PERR_OK
)
{
fprintf
(
stderr
,
"ERROR: %s
\n
"
,
sc
->
pg_client
->
get_error_string
(
ret
));
}
else
if
(
count
==
-
1
)
{
fprintf
(
stderr
,
"ERROR: it takes too long to count sortkey
\n
"
);
}
else
{
fprintf
(
stderr
,
"%"
PRId64
"
\n
"
,
count
);
}
...
...
src/test/function_test/run.sh
浏览文件 @
46486fa8
...
...
@@ -41,9 +41,9 @@ exit_if_fail $? "run test check_and_set failed: $test_case $config_file $table_n
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/check_and_mutate.xml"
GTEST_FILTER
=
"check_and_mutate.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test check_and_mutate failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/scan.xml"
GTEST_FILTER
=
"scan.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test scan failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/ttl.xml"
GTEST_FILTER
=
"ttl.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test ttl failed:
$test_case
$config_file
$table_name
"
exit_if_fail
$?
"run test scan failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/slog_log.xml"
GTEST_FILTER
=
"lost_log.*"
./
$test_case
$config_file
$table_name
exit_if_fail
$?
"run test slog_lost failed:
$test_case
$config_file
$table_name
"
GTEST_OUTPUT
=
"xml:
$REPORT_DIR
/recall.xml"
GTEST_FILTER
=
"drop_and_recall.*"
./
$test_case
$config_file
$table_name
...
...
src/test/function_test/test_basic.cpp
浏览文件 @
46486fa8
...
...
@@ -546,6 +546,16 @@ TEST(basic, multi_get)
ASSERT_EQ
(
1
,
(
int
)
new_values
.
size
());
ASSERT_EQ
(
"5"
,
new_values
[
"5"
]);
// set a expired value
ret
=
client
->
set
(
"basic_test_multi_get"
,
""
,
"expire_value"
,
5000
,
1
);
ASSERT_EQ
(
PERR_OK
,
ret
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
new_values
.
clear
();
ret
=
client
->
multi_get
(
"basic_test_multi_get"
,
""
,
""
,
options
,
new_values
,
2
);
ASSERT_EQ
(
PERR_INCOMPLETE
,
ret
);
ASSERT_EQ
(
1
,
(
int
)
new_values
.
size
());
ASSERT_EQ
(
"1"
,
new_values
[
"1"
]);
// multi_del
std
::
set
<
std
::
string
>
sortkeys
;
sortkeys
.
insert
(
""
);
...
...
src/test/function_test/test_scan.cpp
浏览文件 @
46486fa8
...
...
@@ -7,14 +7,17 @@
#include <vector>
#include <map>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"
using
namespace
::
pegasus
;
extern
pegasus_client
*
client
;
extern
std
::
shared_ptr
<
dsn
::
replication
::
replication_ddl_client
>
ddl_client
;
static
const
char
CCH
[]
=
"_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
;
static
char
buffer
[
256
];
static
std
::
map
<
std
::
string
,
std
::
map
<
std
::
string
,
std
::
string
>>
base
;
...
...
@@ -397,3 +400,42 @@ TEST_F(scan, OVERALL)
}
compare
(
data
,
base
);
}
TEST_F
(
scan
,
ITERATION_TIME_LIMIT
)
{
// update iteration threshold to 1ms
auto
response
=
ddl_client
->
set_app_envs
(
client
->
get_app_name
(),
{
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
},
{
std
::
to_string
(
1
)});
ASSERT_EQ
(
true
,
response
.
is_ok
());
ASSERT_EQ
(
dsn
::
ERR_OK
,
response
.
get_value
().
err
);
// wait envs to be synced.
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
30
));
// write data into table
int32_t
i
=
0
;
std
::
string
sort_key
;
std
::
string
value
;
while
(
i
<
9000
)
{
sort_key
=
random_string
();
value
=
random_string
();
int
ret
=
client
->
set
(
expected_hash_key
,
sort_key
,
value
);
ASSERT_EQ
(
PERR_OK
,
ret
)
<<
"Error occurred when set, hash_key="
<<
expected_hash_key
<<
", sort_key="
<<
sort_key
<<
", error="
<<
client
->
get_error_string
(
ret
);
i
++
;
}
// get sortkey count timeout
int64_t
count
=
0
;
int
ret
=
client
->
sortkey_count
(
expected_hash_key
,
count
);
ASSERT_EQ
(
0
,
ret
);
ASSERT_EQ
(
count
,
-
1
);
// set iteration threshold to 100ms
response
=
ddl_client
->
set_app_envs
(
client
->
get_app_name
(),
{
ROCKSDB_ITERATION_THRESHOLD_TIME_MS
},
{
std
::
to_string
(
100
)});
ASSERT_EQ
(
true
,
response
.
is_ok
());
ASSERT_EQ
(
dsn
::
ERR_OK
,
response
.
get_value
().
err
);
// wait envs to be synced.
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
30
));
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录