Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
2f92f78e
M
milvus
项目概览
milvus
/
milvus
大约 1 年 前同步成功
通知
261
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
2f92f78e
编写于
9月 22, 2021
作者:
X
Xiaofan
提交者:
GitHub
9月 22, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove meaningless Etcd kv logs (#8019)
Signed-off-by:
N
xiaofan-luan
<
xiaofan.luan@zilliz.com
>
上级
9e17fdb5
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
64 addition
and
13 deletion
+64
-13
internal/kv/etcd/etcd_kv.go
internal/kv/etcd/etcd_kv.go
+53
-13
internal/kv/etcd/etcd_kv_test.go
internal/kv/etcd/etcd_kv_test.go
+11
-0
未找到文件。
internal/kv/etcd/etcd_kv.go
浏览文件 @
2f92f78e
...
...
@@ -27,6 +27,7 @@ const (
RequestTimeout
=
10
*
time
.
Second
)
// EtcdKV implments TxnKv interface, it support to process multiple kvs in a transactions
type
EtcdKV
struct
{
client
*
clientv3
.
Client
rootPath
string
...
...
@@ -59,8 +60,8 @@ func (kv *EtcdKV) GetPath(key string) string {
}
func
(
kv
*
EtcdKV
)
LoadWithPrefix
(
key
string
)
([]
string
,
[]
string
,
error
)
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
log
.
Debug
(
"LoadWithPrefix "
,
zap
.
String
(
"prefix"
,
key
))
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
resp
,
err
:=
kv
.
client
.
Get
(
ctx
,
key
,
clientv3
.
WithPrefix
(),
...
...
@@ -74,12 +75,13 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
keys
=
append
(
keys
,
string
(
kv
.
Key
))
values
=
append
(
values
,
string
(
kv
.
Value
))
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation load with prefix"
)
return
keys
,
values
,
nil
}
func
(
kv
*
EtcdKV
)
LoadWithPrefix2
(
key
string
)
([]
string
,
[]
string
,
[]
int64
,
error
)
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
log
.
Debug
(
"LoadWithPrefix "
,
zap
.
String
(
"prefix"
,
key
))
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
resp
,
err
:=
kv
.
client
.
Get
(
ctx
,
key
,
clientv3
.
WithPrefix
(),
...
...
@@ -95,10 +97,12 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro
values
=
append
(
values
,
string
(
kv
.
Value
))
versions
=
append
(
versions
,
kv
.
Version
)
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation load with prefix2"
)
return
keys
,
values
,
versions
,
nil
}
func
(
kv
*
EtcdKV
)
Load
(
key
string
)
(
string
,
error
)
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
...
...
@@ -109,11 +113,12 @@ func (kv *EtcdKV) Load(key string) (string, error) {
if
resp
.
Count
<=
0
{
return
""
,
fmt
.
Errorf
(
"there is no value on key = %s"
,
key
)
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation load"
)
return
string
(
resp
.
Kvs
[
0
]
.
Value
),
nil
}
func
(
kv
*
EtcdKV
)
MultiLoad
(
keys
[]
string
)
([]
string
,
error
)
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
keys
))
for
_
,
keyLoad
:=
range
keys
{
ops
=
append
(
ops
,
clientv3
.
OpGet
(
path
.
Join
(
kv
.
rootPath
,
keyLoad
)))
...
...
@@ -134,23 +139,21 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) {
result
=
append
(
result
,
""
)
}
for
_
,
ev
:=
range
rp
.
GetResponseRange
()
.
Kvs
{
log
.
Debug
(
"MultiLoad"
,
zap
.
ByteString
(
"key"
,
ev
.
Key
),
zap
.
ByteString
(
"value"
,
ev
.
Value
))
result
=
append
(
result
,
string
(
ev
.
Value
))
}
}
if
len
(
invalid
)
!=
0
{
log
.
Debug
(
"MultiLoad: there are invalid keys"
,
zap
.
Strings
(
"keys"
,
invalid
))
log
.
Warn
(
"MultiLoad: there are invalid keys"
,
zap
.
Strings
(
"keys"
,
invalid
))
err
=
fmt
.
Errorf
(
"there are invalid keys: %s"
,
invalid
)
return
result
,
err
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi load"
)
return
result
,
nil
}
func
(
kv
*
EtcdKV
)
LoadWithRevision
(
key
string
)
([]
string
,
[]
string
,
int64
,
error
)
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
log
.
Debug
(
"LoadWithPrefix "
,
zap
.
String
(
"prefix"
,
key
))
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
resp
,
err
:=
kv
.
client
.
Get
(
ctx
,
key
,
clientv3
.
WithPrefix
(),
...
...
@@ -164,27 +167,33 @@ func (kv *EtcdKV) LoadWithRevision(key string) ([]string, []string, int64, error
keys
=
append
(
keys
,
string
(
kv
.
Key
))
values
=
append
(
values
,
string
(
kv
.
Value
))
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation load with revision"
)
return
keys
,
values
,
resp
.
Header
.
Revision
,
nil
}
func
(
kv
*
EtcdKV
)
Save
(
key
,
value
string
)
error
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Put
(
ctx
,
key
,
value
)
CheckElapseAndWarn
(
start
,
"Slow etcd operation save"
)
return
err
}
// SaveWithLease is a function to put value in etcd with etcd lease options.
func
(
kv
*
EtcdKV
)
SaveWithLease
(
key
,
value
string
,
id
clientv3
.
LeaseID
)
error
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Put
(
ctx
,
key
,
value
,
clientv3
.
WithLease
(
id
))
CheckElapseAndWarn
(
start
,
"Slow etcd operation save with lease"
)
return
err
}
func
(
kv
*
EtcdKV
)
MultiSave
(
kvs
map
[
string
]
string
)
error
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
kvs
))
for
key
,
value
:=
range
kvs
{
ops
=
append
(
ops
,
clientv3
.
OpPut
(
path
.
Join
(
kv
.
rootPath
,
key
),
value
))
...
...
@@ -194,28 +203,34 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
defer
cancel
()
_
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
()
.
Then
(
ops
...
)
.
Commit
()
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi save"
)
return
err
}
func
(
kv
*
EtcdKV
)
RemoveWithPrefix
(
prefix
string
)
error
{
start
:=
time
.
Now
()
key
:=
path
.
Join
(
kv
.
rootPath
,
prefix
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Delete
(
ctx
,
key
,
clientv3
.
WithPrefix
())
CheckElapseAndWarn
(
start
,
"Slow etcd operation remove with prefix"
)
return
err
}
func
(
kv
*
EtcdKV
)
Remove
(
key
string
)
error
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Delete
(
ctx
,
key
)
CheckElapseAndWarn
(
start
,
"Slow etcd operation remove"
)
return
err
}
func
(
kv
*
EtcdKV
)
MultiRemove
(
keys
[]
string
)
error
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
keys
))
for
_
,
key
:=
range
keys
{
ops
=
append
(
ops
,
clientv3
.
OpDelete
(
path
.
Join
(
kv
.
rootPath
,
key
)))
...
...
@@ -225,10 +240,12 @@ func (kv *EtcdKV) MultiRemove(keys []string) error {
defer
cancel
()
_
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
()
.
Then
(
ops
...
)
.
Commit
()
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi remove"
)
return
err
}
func
(
kv
*
EtcdKV
)
MultiSaveAndRemove
(
saves
map
[
string
]
string
,
removals
[]
string
)
error
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
saves
)
+
len
(
removals
))
for
key
,
value
:=
range
saves
{
ops
=
append
(
ops
,
clientv3
.
OpPut
(
path
.
Join
(
kv
.
rootPath
,
key
),
value
))
...
...
@@ -238,47 +255,55 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string)
ops
=
append
(
ops
,
clientv3
.
OpDelete
(
path
.
Join
(
kv
.
rootPath
,
keyDelete
)))
}
log
.
Debug
(
"MultiSaveAndRemove"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
()
.
Then
(
ops
...
)
.
Commit
()
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi save and remove"
)
return
err
}
func
(
kv
*
EtcdKV
)
Watch
(
key
string
)
clientv3
.
WatchChan
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
rch
:=
kv
.
client
.
Watch
(
context
.
Background
(),
key
,
clientv3
.
WithCreatedNotify
())
CheckElapseAndWarn
(
start
,
"Slow etcd operation watch"
)
return
rch
}
func
(
kv
*
EtcdKV
)
WatchWithPrefix
(
key
string
)
clientv3
.
WatchChan
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
rch
:=
kv
.
client
.
Watch
(
context
.
Background
(),
key
,
clientv3
.
WithPrefix
(),
clientv3
.
WithCreatedNotify
())
CheckElapseAndWarn
(
start
,
"Slow etcd operation watch with prefix"
)
return
rch
}
func
(
kv
*
EtcdKV
)
WatchWithRevision
(
key
string
,
revision
int64
)
clientv3
.
WatchChan
{
start
:=
time
.
Now
()
key
=
path
.
Join
(
kv
.
rootPath
,
key
)
rch
:=
kv
.
client
.
Watch
(
context
.
Background
(),
key
,
clientv3
.
WithPrefix
(),
clientv3
.
WithPrevKV
(),
clientv3
.
WithRev
(
revision
))
CheckElapseAndWarn
(
start
,
"Slow etcd operation watch with revision"
)
return
rch
}
func
(
kv
*
EtcdKV
)
MultiRemoveWithPrefix
(
keys
[]
string
)
error
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
keys
))
for
_
,
k
:=
range
keys
{
op
:=
clientv3
.
OpDelete
(
path
.
Join
(
kv
.
rootPath
,
k
),
clientv3
.
WithPrefix
())
ops
=
append
(
ops
,
op
)
}
log
.
Debug
(
"MultiRemoveWithPrefix"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
()
.
Then
(
ops
...
)
.
Commit
()
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi remove with prefix"
)
return
err
}
func
(
kv
*
EtcdKV
)
MultiSaveAndRemoveWithPrefix
(
saves
map
[
string
]
string
,
removals
[]
string
)
error
{
start
:=
time
.
Now
()
ops
:=
make
([]
clientv3
.
Op
,
0
,
len
(
saves
))
for
key
,
value
:=
range
saves
{
ops
=
append
(
ops
,
clientv3
.
OpPut
(
path
.
Join
(
kv
.
rootPath
,
key
),
value
))
...
...
@@ -288,33 +313,38 @@ func (kv *EtcdKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removals
ops
=
append
(
ops
,
clientv3
.
OpDelete
(
path
.
Join
(
kv
.
rootPath
,
keyDelete
),
clientv3
.
WithPrefix
()))
}
log
.
Debug
(
"MultiSaveAndRemove"
)
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
_
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
()
.
Then
(
ops
...
)
.
Commit
()
CheckElapseAndWarn
(
start
,
"Slow etcd operation multi save and move with prefix"
)
return
err
}
// Grant creates a new lease implemented in etcd grant interface.
func
(
kv
*
EtcdKV
)
Grant
(
ttl
int64
)
(
id
clientv3
.
LeaseID
,
err
error
)
{
start
:=
time
.
Now
()
resp
,
err
:=
kv
.
client
.
Grant
(
context
.
Background
(),
ttl
)
CheckElapseAndWarn
(
start
,
"Slow etcd operation grant"
)
return
resp
.
ID
,
err
}
// KeepAlive keeps the lease alive forever with leaseID.
// Implemented in etcd interface.
func
(
kv
*
EtcdKV
)
KeepAlive
(
id
clientv3
.
LeaseID
)
(
<-
chan
*
clientv3
.
LeaseKeepAliveResponse
,
error
)
{
start
:=
time
.
Now
()
ch
,
err
:=
kv
.
client
.
KeepAlive
(
context
.
Background
(),
id
)
if
err
!=
nil
{
return
nil
,
err
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation keepAlive"
)
return
ch
,
nil
}
// CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd.
func
(
kv
*
EtcdKV
)
CompareValueAndSwap
(
key
,
value
,
target
string
,
opts
...
clientv3
.
OpOption
)
error
{
start
:=
time
.
Now
()
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
resp
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
(
...
...
@@ -329,13 +359,14 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv
if
!
resp
.
Succeeded
{
return
fmt
.
Errorf
(
"function CompareAndSwap error for compare is false for key: %s"
,
key
)
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation compare value and swap"
)
return
nil
}
// CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd.
func
(
kv
*
EtcdKV
)
CompareVersionAndSwap
(
key
string
,
version
int64
,
target
string
,
opts
...
clientv3
.
OpOption
)
error
{
start
:=
time
.
Now
()
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
TODO
(),
RequestTimeout
)
defer
cancel
()
resp
,
err
:=
kv
.
client
.
Txn
(
ctx
)
.
If
(
...
...
@@ -350,6 +381,15 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string
if
!
resp
.
Succeeded
{
return
fmt
.
Errorf
(
"function CompareAndSwap error for compare is false for key: %s"
,
key
)
}
CheckElapseAndWarn
(
start
,
"Slow etcd operation compare version and swap"
)
return
nil
}
func
CheckElapseAndWarn
(
start
time
.
Time
,
message
string
)
bool
{
elapsed
:=
time
.
Since
(
start
)
if
elapsed
.
Milliseconds
()
>
2000
{
log
.
Warn
(
message
,
zap
.
String
(
"time spent"
,
elapsed
.
String
()))
return
true
}
return
false
}
internal/kv/etcd/etcd_kv_test.go
浏览文件 @
2f92f78e
...
...
@@ -15,6 +15,7 @@ import (
"os"
"strings"
"testing"
"time"
etcdkv
"github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/paramtable"
...
...
@@ -488,3 +489,13 @@ func TestEtcdKV_Load(te *testing.T) {
})
}
func
TestElapse
(
t
*
testing
.
T
)
{
start
:=
time
.
Now
()
isElapse
:=
etcdkv
.
CheckElapseAndWarn
(
start
,
"err message"
)
assert
.
Equal
(
t
,
isElapse
,
false
)
time
.
Sleep
(
time
.
Duration
(
3
)
*
time
.
Second
)
isElapse
=
etcdkv
.
CheckElapseAndWarn
(
start
,
"err message"
)
assert
.
Equal
(
t
,
isElapse
,
true
)
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录