From ccc0130bb447b62f9afdf9cb8bfba86ffd96db5c Mon Sep 17 00:00:00 2001 From: godchen Date: Tue, 18 May 2021 19:07:27 +0800 Subject: [PATCH] Add exclusive parameter (#5278) Add exclusive parameter. Issue #5174 Signed-off-by: godchen --- .../appendix_a_basic_components.md | 40 +++++++++++-------- internal/kv/etcd/etcd_kv.go | 8 ++-- .../{session => sessionutil}/session_util.go | 36 ++++++++++------- .../session_util_test.go | 35 ++++++++++++++-- scripts/run_go_unittest.sh | 2 +- 5 files changed, 82 insertions(+), 39 deletions(-) rename internal/util/{session => sessionutil}/session_util.go (82%) rename internal/util/{session => sessionutil}/session_util_test.go (77%) diff --git a/docs/developer_guides/appendix_a_basic_components.md b/docs/developer_guides/appendix_a_basic_components.md index ee172a701..87278ce21 100644 --- a/docs/developer_guides/appendix_a_basic_components.md +++ b/docs/developer_guides/appendix_a_basic_components.md @@ -60,27 +60,32 @@ The ID is stored in a key-value pair on etcd. The key is metaRootPath + "/servic ###### Registeration -- Registration is achieved through etcd's lease mechanism. +* Registration is achieved through etcd's lease mechanism. -- The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key. +* The service creates a lease with etcd and stores a key-value pair in etcd. If the lease expires or the service goes offline, etcd will delete the key-value pair. You can judge whether this service is avaliable through the key. -- key: metaRootPath + "/services" + "/ServerName-ServerID" +* key: metaRootPath + "/services" + "/ServerName(-ServerID)(optional)" -- value: json format +* value: json format + + ```json { - "ServerID":ServerID //ServerID - "ServerName": ServerName // ServerName - "Address": ip:port // Address of service, including ip and port - "LeaseID": LeaseID // The ID of etcd lease + "ServerID": "ServerID", + "ServerName": "ServerName", + "Address": "ip:port", + "LeaseID": "LeaseID", } + ``` + +* By obtaining the address, you can establish a connection with other services -- By obtaining the address, you can establish a connection with other services +* If a service is exclusive, the key will not have **ServerID**. But **ServerID** still will be stored in value. ###### Discovery -- All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key -- Registeration time can be compared with ServerID for ServerID will increase according to time. +* All currently available services can be obtained by obtaining all the key-value pairs deposited during registration. If you want to get all the available nodes for a certain type of service, you can pass in the prefix of the corresponding key +* Registeration time can be compared with ServerID for ServerID will increase according to time. ###### Interface @@ -123,15 +128,18 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) {} // RegisterService registers the service to etcd so that other services // can find that the service is online and issue subsequent operations // RegisterService will save a key-value in etcd -// key: metaRootPath + "/services" + "/ServerName-ServerID" +// key: metaRootPath + "/services/" + "ServerName(-ServerID)(optional)" // value: json format // { -// "ServerID": ServerID -// "ServerName": ServerName // ServerName -// "Address": ip:port // Address of service, including ip and port -// "LeaseID": LeaseID // The ID of etcd lease +// "ServerID": "ServerID", +// "ServerName": "ServerName", +// "Address": "ip:port", +// "LeaseID": "LeaseID", // } // MetaRootPath is configurable in the config file. +// Exclusive means whether this service can exist two at the same time, if so, +// it is false. Otherwise, set it to true and the key will not have ServerID. +// But ServerID still will be stored in value. func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) {} // ProcessKeepAliveResponse processes the response of etcd keepAlive interface diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index 5193c848b..af3e6fdf0 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -259,7 +259,7 @@ func (kv *EtcdKV) KeepAlive(id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliv // CompareValueAndSwap compares the existing value with compare, and if they are // equal, the target is stored in etcd. -func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { +func (kv *EtcdKV) CompareValueAndSwap(key, value, target string, opts ...clientv3.OpOption) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -267,7 +267,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { clientv3.Value(path.Join(kv.rootPath, key)), "=", value)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() + Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { return err } @@ -280,7 +280,7 @@ func (kv *EtcdKV) CompareValueAndSwap(key, value, target string) error { // CompareVersionAndSwap compares the existing key-value's version with version, and if // they are equal, the target is stored in etcd. -func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string) error { +func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string, opts ...clientv3.OpOption) error { ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If( @@ -288,7 +288,7 @@ func (kv *EtcdKV) CompareVersionAndSwap(key string, version int64, target string clientv3.Version(path.Join(kv.rootPath, key)), "=", version)). - Then(clientv3.OpPut(path.Join(kv.rootPath, key), target)).Commit() + Then(clientv3.OpPut(path.Join(kv.rootPath, key), target, opts...)).Commit() if err != nil { return err } diff --git a/internal/util/session/session_util.go b/internal/util/sessionutil/session_util.go similarity index 82% rename from internal/util/session/session_util.go rename to internal/util/sessionutil/session_util.go index 6c771a3b5..3db96cb0d 100644 --- a/internal/util/session/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -1,4 +1,4 @@ -package session +package sessionutil import ( "context" @@ -16,7 +16,8 @@ import ( "go.uber.org/zap" ) -const defaultIDKey = "services/id" +const defaultServiceRoot = "/services/" +const defaultIDKey = "id" const defaultRetryTimes = 30 // Session is a struct to store service's session, including ServerID, ServerName, @@ -65,10 +66,10 @@ func GetServerID(etcd *etcdkv.EtcdKV) (int64, error) { func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, error) { res := int64(-1) getServerIDWithKeyFn := func() error { - value, err := etcd.Load(key) + value, err := etcd.Load(defaultServiceRoot + key) log.Debug("session", zap.String("get serverid", value)) if err != nil { - err = etcd.CompareVersionAndSwap(key, 0, "1") + err = etcd.CompareVersionAndSwap(defaultServiceRoot+key, 0, "1") if err != nil { log.Debug("session", zap.Error(err)) return err @@ -81,7 +82,7 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, log.Debug("session", zap.Error(err)) return err } - err = etcd.CompareValueAndSwap(key, value, + err = etcd.CompareValueAndSwap(defaultServiceRoot+key, value, strconv.FormatInt(valueInt+1, 10)) if err != nil { log.Debug("session", zap.Error(err)) @@ -101,13 +102,15 @@ func getServerIDWithKey(etcd *etcdkv.EtcdKV, key string, retryTimes int) (int64, // key: metaRootPath + "/services" + "/ServerName-ServerID" // value: json format // { -// "ServerID": ServerID -// "ServerName": ServerName // ServerName -// "Address": ip:port // Address of service, including ip and port -// "LeaseID": LeaseID // The ID of etcd lease +// "ServerID": "ServerID", +// "ServerName": "ServerName", +// "Address": "ip:port", +// "LeaseID": "LeaseID", // } // MetaRootPath is configurable in the config file. -func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) { +// Exclusive means whether this service can exist two at the same time, if so, +// it is false. Otherwise, set it to true. +func RegisterService(etcdKV *etcdkv.EtcdKV, exclusive bool, session *Session, ttl int64) (<-chan *clientv3.LeaseKeepAliveResponse, error) { respID, err := etcdKV.Grant(ttl) if err != nil { log.Error("register service", zap.Error(err)) @@ -120,10 +123,13 @@ func RegisterService(etcdKV *etcdkv.EtcdKV, session *Session, ttl int64) (<-chan return nil, err } - err = etcdKV.SaveWithLease(fmt.Sprintf("/services/%s-%d", session.ServerName, session.ServerID), - string(sessionJSON), respID) + key := defaultServiceRoot + session.ServerName + if !exclusive { + key = key + "-" + strconv.FormatInt(session.ServerID, 10) + } + err = etcdKV.CompareVersionAndSwap(key, 0, string(sessionJSON), clientv3.WithLease(respID)) if err != nil { - fmt.Printf("put lease error %s\n", err) + fmt.Printf("compare and swap error %s\n. maybe the key has registered", err) return nil, err } @@ -165,7 +171,7 @@ func ProcessKeepAliveResponse(ctx context.Context, ch <-chan *clientv3.LeaseKeep // For general, "datanode" to get all datanodes func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) { sessions := make([]*Session, 0) - _, resValue, err := etcdKV.LoadWithPrefix("/services/" + prefix) + _, resValue, err := etcdKV.LoadWithPrefix(defaultServiceRoot + prefix) if err != nil { return nil, err } @@ -186,7 +192,7 @@ func GetSessions(etcdKV *etcdkv.EtcdKV, prefix string) ([]*Session, error) { func WatchServices(ctx context.Context, etcdKV *etcdkv.EtcdKV, prefix string) (addChannel <-chan *Session, deleteChannel <-chan *Session) { addCh := make(chan *Session, 10) deleteCh := make(chan *Session, 10) - rch := etcdKV.WatchWithPrefix("/services/" + prefix) + rch := etcdKV.WatchWithPrefix(defaultServiceRoot + prefix) go func() { for { select { diff --git a/internal/util/session/session_util_test.go b/internal/util/sessionutil/session_util_test.go similarity index 77% rename from internal/util/session/session_util_test.go rename to internal/util/sessionutil/session_util_test.go index bbb1759bc..efd56eb07 100644 --- a/internal/util/session/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -1,4 +1,4 @@ -package session +package sessionutil import ( "fmt" @@ -78,7 +78,7 @@ func TestRegister(t *testing.T) { id, err := GetServerID(etcdKV) assert.Nil(t, err) session := NewSession(id, "test", "localhost") - _, err = RegisterService(etcdKV, session, 10) + _, err = RegisterService(etcdKV, false, session, 10) assert.Nil(t, err) sessionReturn := <-addChannel assert.Equal(t, sessionReturn, session) @@ -96,6 +96,35 @@ func TestRegister(t *testing.T) { } } +func TestRegisterExclusive(t *testing.T) { + Params.Init() + + etcdAddr, err := Params.Load("_EtcdAddress") + if err != nil { + panic(err) + } + + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) + assert.Nil(t, err) + rootPath := "/etcd/test/root" + etcdKV := etcdkv.NewEtcdKV(cli, rootPath) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + id, err := GetServerID(etcdKV) + assert.Nil(t, err) + session := NewSession(id, "test", "localhost") + _, err = RegisterService(etcdKV, true, session, 10) + assert.Nil(t, err) + + id, err = GetServerID(etcdKV) + assert.Nil(t, err) + session = NewSession(id, "test", "helloworld") + _, err = RegisterService(etcdKV, true, session, 10) + assert.NotNil(t, err) +} + func TestKeepAlive(t *testing.T) { Params.Init() ctx, cancel := context.WithCancel(context.Background()) @@ -117,7 +146,7 @@ func TestKeepAlive(t *testing.T) { id, err := GetServerID(etcdKV) assert.Nil(t, err) session := NewSession(id, "test", "localhost") - ch, err := RegisterService(etcdKV, session, 10) + ch, err := RegisterService(etcdKV, false, session, 10) assert.Nil(t, err) aliveCh := ProcessKeepAliveResponse(ctx, ch) diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh index 8b9a83e2b..4b3790318 100755 --- a/scripts/run_go_unittest.sh +++ b/scripts/run_go_unittest.sh @@ -14,7 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" MILVUS_DIR="${ROOT_DIR}/internal/" echo $MILVUS_DIR -go test -race -cover "${MILVUS_DIR}/util/session/..." -failfast +go test -race -cover "${MILVUS_DIR}/util/sessionutil/..." -failfast go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast # TODO: remove to distributed #go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast -- GitLab