未验证 提交 1601a61b 编写于 作者: Z zhenshan.cao 提交者: GitHub

Move logic of checking available port to service (#15222)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 8873be6a
......@@ -217,7 +217,10 @@ func (s *Server) Stop() error {
func (s *Server) init() error {
ctx := context.Background()
Params.InitOnce(typeutil.DataNodeRole)
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("DataNode get available port when init", zap.Int("Port", Params.Port))
}
dn.Params.InitOnce()
dn.Params.DataNodeCfg.Port = Params.Port
dn.Params.DataNodeCfg.IP = Params.IP
......
......@@ -119,7 +119,10 @@ func (s *Server) startGrpcLoop(grpcPort int) {
func (s *Server) init() error {
var err error
Params.InitOnce(typeutil.IndexNodeRole)
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode get available port when init", zap.Int("Port", Params.Port))
}
indexnode.Params.InitOnce()
indexnode.Params.IndexNodeCfg.Port = Params.Port
indexnode.Params.IndexNodeCfg.IP = Params.IP
......
......@@ -158,12 +158,11 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.InitOnce(typeutil.ProxyRole)
log.Debug("init Proxy service's parameter table done")
log.Debug("Proxy init service's parameter table done")
proxy.Params.ProxyCfg.NetworkAddress = Params.GetAddress()
if !paramtable.CheckPortAvailable(Params.Port) {
// as the entry of Milvus, we'd better not to use another port
return fmt.Errorf("port %d already in use", Params.Port)
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("Proxy get available port when init", zap.Int("Port", Params.Port))
}
proxy.Params.InitOnce()
......
......@@ -87,6 +87,11 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
func (s *Server) init() error {
Params.InitOnce(typeutil.QueryNodeRole)
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port))
}
qn.Params.InitOnce()
qn.Params.QueryNodeCfg.QueryNodeIP = Params.IP
qn.Params.QueryNodeCfg.QueryNodePort = int64(Params.Port)
......
......@@ -22,7 +22,9 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"
"github.com/go-basic/ipv4"
......@@ -172,3 +174,24 @@ func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (strin
func CheckCtxValid(ctx context.Context) bool {
return ctx.Err() != context.DeadlineExceeded && ctx.Err() != context.Canceled
}
// CheckPortAvailable check if a port is available to be listened on
func CheckPortAvailable(port int) bool {
addr := ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", addr)
if listener != nil {
listener.Close()
}
return err == nil
}
// GetAvailablePort return an available port that can be listened on
func GetAvailablePort() int {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}
......@@ -282,3 +282,11 @@ func TestCheckCtxValid(t *testing.T) {
time.Sleep(timeout + deltaTime)
assert.False(t, CheckCtxValid(ctx3))
}
func TestCheckPortAvailable(t *testing.T) {
num := 10
for i := 0; i < num; i++ {
port := GetAvailablePort()
assert.Equal(t, CheckPortAvailable(port), true)
}
}
......@@ -13,7 +13,6 @@ package paramtable
import (
"math"
"net"
"os"
"path"
"strconv"
......@@ -23,7 +22,6 @@ import (
"github.com/go-basic/ipv4"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
......@@ -1472,12 +1470,6 @@ func (p *grpcConfig) LoadFromArgs() {
func (p *grpcConfig) initPort() {
p.Port = p.ParseInt(p.Domain + ".port")
if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole {
if !CheckPortAvailable(p.Port) {
p.Port = GetAvailablePort()
log.Warn("get available port when init", zap.String("Domain", p.Domain), zap.Int("Port", p.Port))
}
}
}
// GetAddress return grpc address
......@@ -1620,24 +1612,3 @@ func (p *GrpcClientConfig) initClientMaxRecvSize() {
log.Debug("initClientMaxRecvSize",
zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize))
}
// CheckPortAvailable check if a port is available to be listened on
func CheckPortAvailable(port int) bool {
addr := ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", addr)
if listener != nil {
listener.Close()
}
return err == nil
}
// GetAvailablePort return an available port that can be listened on
func GetAvailablePort() int {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}
......@@ -402,11 +402,3 @@ func TestGrpcClientParams(t *testing.T) {
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize)
}
func TestCheckPortAvailable(t *testing.T) {
num := 10
for i := 0; i < num; i++ {
port := GetAvailablePort()
assert.Equal(t, CheckPortAvailable(port), true)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册