diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 8f1b49858a216f657ff4312562facb284be09737..2578554fba4134a36fa8d84f6bbe70415c03c43d 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -217,7 +217,10 @@ func (s *Server) Stop() error { func (s *Server) init() error { ctx := context.Background() Params.InitOnce(typeutil.DataNodeRole) - + if !funcutil.CheckPortAvailable(Params.Port) { + Params.Port = funcutil.GetAvailablePort() + log.Warn("DataNode get available port when init", zap.Int("Port", Params.Port)) + } dn.Params.InitOnce() dn.Params.DataNodeCfg.Port = Params.Port dn.Params.DataNodeCfg.IP = Params.IP diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 98d9ecee45270739bb3d6726b16eaaddcf92c3a7..c07c63df823ec318c9eca16cd81937c33d6aafeb 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -119,7 +119,10 @@ func (s *Server) startGrpcLoop(grpcPort int) { func (s *Server) init() error { var err error Params.InitOnce(typeutil.IndexNodeRole) - + if !funcutil.CheckPortAvailable(Params.Port) { + Params.Port = funcutil.GetAvailablePort() + log.Warn("IndexNode get available port when init", zap.Int("Port", Params.Port)) + } indexnode.Params.InitOnce() indexnode.Params.IndexNodeCfg.Port = Params.Port indexnode.Params.IndexNodeCfg.IP = Params.IP diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index a80bfe243361c09c189338b5f799780f29d4744a..dff149b92838031849387b34a1c0cb1da0860626 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -158,12 +158,11 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.InitOnce(typeutil.ProxyRole) - log.Debug("init Proxy service's parameter table done") + log.Debug("Proxy init service's parameter table done") - proxy.Params.ProxyCfg.NetworkAddress = Params.GetAddress() - if !paramtable.CheckPortAvailable(Params.Port) { - // as the entry of Milvus, we'd better not to use another port - return fmt.Errorf("port %d already in use", Params.Port) + if !funcutil.CheckPortAvailable(Params.Port) { + Params.Port = funcutil.GetAvailablePort() + log.Warn("Proxy get available port when init", zap.Int("Port", Params.Port)) } proxy.Params.InitOnce() diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index be397c33ba1872cf564068101ca949f79c78ce4d..507c950f1faac7e83d4690606f561f6f94cfe4f2 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -87,6 +87,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) init() error { Params.InitOnce(typeutil.QueryNodeRole) + if !funcutil.CheckPortAvailable(Params.Port) { + Params.Port = funcutil.GetAvailablePort() + log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port)) + } + qn.Params.InitOnce() qn.Params.QueryNodeCfg.QueryNodeIP = Params.IP qn.Params.QueryNodeCfg.QueryNodePort = int64(Params.Port) diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go index db2a09d7fc7e631a7d11881a74e6a6744fadfea6..afc6431af08d0e7184e50b22102fae7dce91baea 100644 --- a/internal/util/funcutil/func.go +++ b/internal/util/funcutil/func.go @@ -22,7 +22,9 @@ import ( "errors" "fmt" "io/ioutil" + "net" "net/http" + "strconv" "time" "github.com/go-basic/ipv4" @@ -172,3 +174,24 @@ func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (strin func CheckCtxValid(ctx context.Context) bool { return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled } + +// CheckPortAvailable check if a port is available to be listened on +func CheckPortAvailable(port int) bool { + addr := ":" + strconv.Itoa(port) + listener, err := net.Listen("tcp", addr) + if listener != nil { + listener.Close() + } + return err == nil +} + +// GetAvailablePort return an available port that can be listened on +func GetAvailablePort() int { + listener, err := net.Listen("tcp", ":0") + if err != nil { + panic(err) + } + defer listener.Close() + + return listener.Addr().(*net.TCPAddr).Port +} diff --git a/internal/util/funcutil/func_test.go b/internal/util/funcutil/func_test.go index fd3b54c3736996f1cea8e8488202496b64f368c8..9f1e5ce5d7d3c5a2cadcfce5f01aad470cf37455 100644 --- a/internal/util/funcutil/func_test.go +++ b/internal/util/funcutil/func_test.go @@ -282,3 +282,11 @@ func TestCheckCtxValid(t *testing.T) { time.Sleep(timeout + deltaTime) assert.False(t, CheckCtxValid(ctx3)) } + +func TestCheckPortAvailable(t *testing.T) { + num := 10 + for i := 0; i < num; i++ { + port := GetAvailablePort() + assert.Equal(t, CheckPortAvailable(port), true) + } +} diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index c52966abdf83a1b26ade4a653ec0ef9d667cea3a..8d900a7c9f0f3e61fc79861f074b02c8d39ae3f4 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -13,7 +13,6 @@ package paramtable import ( "math" - "net" "os" "path" "strconv" @@ -23,7 +22,6 @@ import ( "github.com/go-basic/ipv4" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) @@ -1472,12 +1470,6 @@ func (p *grpcConfig) LoadFromArgs() { func (p *grpcConfig) initPort() { p.Port = p.ParseInt(p.Domain + ".port") - if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole { - if !CheckPortAvailable(p.Port) { - p.Port = GetAvailablePort() - log.Warn("get available port when init", zap.String("Domain", p.Domain), zap.Int("Port", p.Port)) - } - } } // GetAddress return grpc address @@ -1620,24 +1612,3 @@ func (p *GrpcClientConfig) initClientMaxRecvSize() { log.Debug("initClientMaxRecvSize", zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize)) } - -// CheckPortAvailable check if a port is available to be listened on -func CheckPortAvailable(port int) bool { - addr := ":" + strconv.Itoa(port) - listener, err := net.Listen("tcp", addr) - if listener != nil { - listener.Close() - } - return err == nil -} - -// GetAvailablePort return an available port that can be listened on -func GetAvailablePort() int { - listener, err := net.Listen("tcp", ":0") - if err != nil { - panic(err) - } - defer listener.Close() - - return listener.Addr().(*net.TCPAddr).Port -} diff --git a/internal/util/paramtable/global_param_test.go b/internal/util/paramtable/global_param_test.go index 649000859bafaf83a790de4ec02717c42fd5c499..4c6d79712570029916ba71e5d7fe5e5c56618277 100644 --- a/internal/util/paramtable/global_param_test.go +++ b/internal/util/paramtable/global_param_test.go @@ -402,11 +402,3 @@ func TestGrpcClientParams(t *testing.T) { Params.initClientMaxSendSize() assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize) } - -func TestCheckPortAvailable(t *testing.T) { - num := 10 - for i := 0; i < num; i++ { - port := GetAvailablePort() - assert.Equal(t, CheckPortAvailable(port), true) - } -}