未验证 提交 ccc0130b 编写于 作者: G godchen 提交者: GitHub

Add exclusive parameter (#5278)

Add exclusive parameter.
Issue #5174 
Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 d68ee3fb
...@@ -60,27 +60,32 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic ...@@ -60,27 +60,32 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic
###### Registeration ###### Registeration
- Registration is achieved through etcd's lease mechanism. * Registration is achieved through etcd's lease mechanism.
- The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key. * The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key.
- key: metaRootPath + "/services" + "/ServerName-ServerID" * key: metaRootPath + "/services" + "/ServerName(-ServerID)(optional)"
- value: json format * value: json format
```json
{ {
"ServerID":ServerID //ServerID "ServerID": "ServerID",
"ServerName": ServerName // ServerName "ServerName": "ServerName",
"Address": ip:port // Address of service, including ip and port "Address": "ip:port",
"LeaseID": LeaseID // The ID of etcd lease "LeaseID": "LeaseID",
} }
```
* By obtaining the address, you can establish a connection with other services
- By obtaining the address, you can establish a connection with other services * If a service is exclusive, the key will not have **ServerID**. But **ServerID** still will be stored in value.
###### Discovery ###### Discovery
- All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key * All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key
- Registeration time can be compared with ServerID for ServerID will increase according to time.
* Registeration time can be compared with ServerID for ServerID will increase according to time.
###### Interface ###### Interface
...@@ -123,15 +128,18 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {} ...@@ -123,15 +128,18 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {}
// RegisterService registers the service to etcd so that other services // RegisterService registers the service to etcd so that other services
// can find that the service is online and issue subsequent operations // can find that the service is online and issue subsequent operations
// RegisterService will save a key-value in etcd // RegisterService will save a key-value in etcd
// key: metaRootPath + "/services" + "/ServerName-ServerID" // key: metaRootPath + "/services/" + "ServerName(-ServerID)(optional)"
// value: json format // value: json format
// { // {
// "ServerID": ServerID // "ServerID": "ServerID",
// "ServerName": ServerName // ServerName // "ServerName": "ServerName",
// "Address": ip:port // Address of service, including ip and port // "Address": "ip:port",
// "LeaseID": LeaseID // The ID of etcd lease // "LeaseID": "LeaseID",
// } // }
// MetaRootPath is configurable in the config file. // MetaRootPath is configurable in the config file.
// Exclusive means whether this service can exist two at the same time, if so,
// it is false. Otherwise, set it to true and the key will not have ServerID.
// But ServerID still will be stored in value.
func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {} func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {}
// ProcessKeepAliveResponse processes the response of etcd keepAlive interface // ProcessKeepAliveResponse processes the response of etcd keepAlive interface
......
...@@ -259,7 +259,7 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv ...@@ -259,7 +259,7 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv
// CompareValueAndSwap compares the existing value with compare, and if they are // CompareValueAndSwap compares the existing value with compare, and if they are
// equal, the target is stored in etcd. // equal, the target is stored in etcd.
func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel() defer cancel()
resp, err := kv.client.Txn(ctx).If( resp, err := kv.client.Txn(ctx).If(
...@@ -267,7 +267,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { ...@@ -267,7 +267,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error {
clientv3.Value(path.Join(kv.rootPath, key)), clientv3.Value(path.Join(kv.rootPath, key)),
"=", "=",
value)). value)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil { if err != nil {
return err return err
} }
...@@ -280,7 +280,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { ...@@ -280,7 +280,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error {
// CompareVersionAndSwap compares the existing key-value's version with version, and if // CompareVersionAndSwap compares the existing key-value's version with version, and if
// they are equal, the target is stored in etcd. // they are equal, the target is stored in etcd.
func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string) error { func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error {
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel() defer cancel()
resp, err := kv.client.Txn(ctx).If( resp, err := kv.client.Txn(ctx).If(
...@@ -288,7 +288,7 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string ...@@ -288,7 +288,7 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string
clientv3.Version(path.Join(kv.rootPath, key)), clientv3.Version(path.Join(kv.rootPath, key)),
"=", "=",
version)). version)).
Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit()
if err != nil { if err != nil {
return err return err
} }
......
package session package sessionutil
import ( import (
"context" "context"
...@@ -16,7 +16,8 @@ import ( ...@@ -16,7 +16,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const defaultIDKey = "services/id" const defaultServiceRoot = "/services/"
const defaultIDKey = "id"
const defaultRetryTimes = 30 const defaultRetryTimes = 30
// Session is a struct to store service's session, including ServerID, ServerName, // Session is a struct to store service's session, including ServerID, ServerName,
...@@ -65,10 +66,10 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) { ...@@ -65,10 +66,10 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {
func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, error) { func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, error) {
res := int64(-1) res := int64(-1)
getServerIDWithKeyFn := func() error { getServerIDWithKeyFn := func() error {
value, err := etcd.Load(key) value, err := etcd.Load(defaultServiceRoot + key)
log.Debug("session", zap.String("get serverid", value)) log.Debug("session", zap.String("get serverid", value))
if err != nil { if err != nil {
err = etcd.CompareVersionAndSwap(key, 0, "1") err = etcd.CompareVersionAndSwap(defaultServiceRoot+key, 0, "1")
if err != nil { if err != nil {
log.Debug("session", zap.Error(err)) log.Debug("session", zap.Error(err))
return err return err
...@@ -81,7 +82,7 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, ...@@ -81,7 +82,7 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64,
log.Debug("session", zap.Error(err)) log.Debug("session", zap.Error(err))
return err return err
} }
err = etcd.CompareValueAndSwap(key, value, err = etcd.CompareValueAndSwap(defaultServiceRoot+key, value,
strconv.FormatInt(valueInt+1, 10)) strconv.FormatInt(valueInt+1, 10))
if err != nil { if err != nil {
log.Debug("session", zap.Error(err)) log.Debug("session", zap.Error(err))
...@@ -101,13 +102,15 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, ...@@ -101,13 +102,15 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64,
// key: metaRootPath + "/services" + "/ServerName-ServerID" // key: metaRootPath + "/services" + "/ServerName-ServerID"
// value: json format // value: json format
// { // {
// "ServerID": ServerID // "ServerID": "ServerID",
// "ServerName": ServerName // ServerName // "ServerName": "ServerName",
// "Address": ip:port // Address of service, including ip and port // "Address": "ip:port",
// "LeaseID": LeaseID // The ID of etcd lease // "LeaseID": "LeaseID",
// } // }
// MetaRootPath is configurable in the config file. // MetaRootPath is configurable in the config file.
func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) { // Exclusive means whether this service can exist two at the same time, if so,
// it is false. Otherwise, set it to true.
func RegisterService(etcdKV *etcdkv.EtcdKV, exclusive bool, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
respID, err := etcdKV.Grant(ttl) respID, err := etcdKV.Grant(ttl)
if err != nil { if err != nil {
log.Error("register service", zap.Error(err)) log.Error("register service", zap.Error(err))
...@@ -120,10 +123,13 @@ func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan ...@@ -120,10 +123,13 @@ func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan
return nil, err return nil, err
} }
err = etcdKV.SaveWithLease(fmt.Sprintf("/services/%s-%d", session.ServerName, session.ServerID), key := defaultServiceRoot + session.ServerName
string(sessionJSON), respID) if !exclusive {
key = key + "-" + strconv.FormatInt(session.ServerID, 10)
}
err = etcdKV.CompareVersionAndSwap(key, 0, string(sessionJSON), clientv3.WithLease(respID))
if err != nil { if err != nil {
fmt.Printf("put lease error %s\n", err) fmt.Printf("compare and swap error %s\n. maybe the key has registered", err)
return nil, err return nil, err
} }
...@@ -165,7 +171,7 @@ func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeep ...@@ -165,7 +171,7 @@ func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeep
// For general, "datanode" to get all datanodes // For general, "datanode" to get all datanodes
func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) { func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {
sessions := make([]*Session, 0) sessions := make([]*Session, 0)
_, resValue, err := etcdKV.LoadWithPrefix("/services/" + prefix) _, resValue, err := etcdKV.LoadWithPrefix(defaultServiceRoot + prefix)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -186,7 +192,7 @@ func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) { ...@@ -186,7 +192,7 @@ func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) {
func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) { func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) {
addCh := make(chan *Session, 10) addCh := make(chan *Session, 10)
deleteCh := make(chan *Session, 10) deleteCh := make(chan *Session, 10)
rch := etcdKV.WatchWithPrefix("/services/" + prefix) rch := etcdKV.WatchWithPrefix(defaultServiceRoot + prefix)
go func() { go func() {
for { for {
select { select {
......
package session package sessionutil
import ( import (
"fmt" "fmt"
...@@ -78,7 +78,7 @@ func TestRegister(t *testing.T) { ...@@ -78,7 +78,7 @@ func TestRegister(t *testing.T) {
id, err := GetServerID(etcdKV) id, err := GetServerID(etcdKV)
assert.Nil(t, err) assert.Nil(t, err)
session := NewSession(id, "test", "localhost") session := NewSession(id, "test", "localhost")
_, err = RegisterService(etcdKV, session, 10) _, err = RegisterService(etcdKV, false, session, 10)
assert.Nil(t, err) assert.Nil(t, err)
sessionReturn := <-addChannel sessionReturn := <-addChannel
assert.Equal(t, sessionReturn, session) assert.Equal(t, sessionReturn, session)
...@@ -96,6 +96,35 @@ func TestRegister(t *testing.T) { ...@@ -96,6 +96,35 @@ func TestRegister(t *testing.T) {
} }
} }
func TestRegisterExclusive(t *testing.T) {
Params.Init()
etcdAddr, err := Params.Load("_EtcdAddress")
if err != nil {
panic(err)
}
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
assert.Nil(t, err)
rootPath := "/etcd/test/root"
etcdKV := etcdkv.NewEtcdKV(cli, rootPath)
defer etcdKV.Close()
defer etcdKV.RemoveWithPrefix("")
id, err := GetServerID(etcdKV)
assert.Nil(t, err)
session := NewSession(id, "test", "localhost")
_, err = RegisterService(etcdKV, true, session, 10)
assert.Nil(t, err)
id, err = GetServerID(etcdKV)
assert.Nil(t, err)
session = NewSession(id, "test", "helloworld")
_, err = RegisterService(etcdKV, true, session, 10)
assert.NotNil(t, err)
}
func TestKeepAlive(t *testing.T) { func TestKeepAlive(t *testing.T) {
Params.Init() Params.Init()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
...@@ -117,7 +146,7 @@ func TestKeepAlive(t *testing.T) { ...@@ -117,7 +146,7 @@ func TestKeepAlive(t *testing.T) {
id, err := GetServerID(etcdKV) id, err := GetServerID(etcdKV)
assert.Nil(t, err) assert.Nil(t, err)
session := NewSession(id, "test", "localhost") session := NewSession(id, "test", "localhost")
ch, err := RegisterService(etcdKV, session, 10) ch, err := RegisterService(etcdKV, false, session, 10)
assert.Nil(t, err) assert.Nil(t, err)
aliveCh := ProcessKeepAliveResponse(ctx, ch) aliveCh := ProcessKeepAliveResponse(ctx, ch)
......
...@@ -14,7 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" ...@@ -14,7 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"
MILVUS_DIR="${ROOT_DIR}/internal/" MILVUS_DIR="${ROOT_DIR}/internal/"
echo $MILVUS_DIR echo $MILVUS_DIR
go test -race -cover "${MILVUS_DIR}/util/session/..." -failfast go test -race -cover "${MILVUS_DIR}/util/sessionutil/..." -failfast
go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast
# TODO: remove to distributed # TODO: remove to distributed
#go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册