未验证 提交 cf50e9ac 编写于 作者: C Cai Yudong 提交者: GitHub

Use GrpcServerConfig and GrpcClientConfig for distributed components (#13469)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 4e678d89
......@@ -20,7 +20,6 @@ import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
......@@ -31,9 +30,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcClientConfig
// Client is the datacoord grpc client
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -48,7 +51,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
log.Debug("DataCoordClient NewClient failed", zap.Error(err))
return nil, err
}
Params.Init()
Params.InitOnce(typeutil.DataCoordRole)
client := &Client{
grpcClient: &grpcclient.ClientBase{
ClientMaxRecvSize: Params.ClientMaxRecvSize,
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatacoordclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("dataCoord.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataCoord.grpc.clientMaxSendSize, set to default",
zap.String("dataCoord.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("dataCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("dataCoord.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataCoord.grpc.clientMaxRecvSize, set to default",
zap.String("dataCoord.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("dataCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatacoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
err := Params.Remove("dataCoord.grpc.clientMaxSendSize")
assert.Nil(t, err)
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
err = Params.Remove("dataCoord.grpc.clientMaxRecvSize")
assert.Nil(t, err)
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatacoord
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
func (pt *ParamTable) initParams() {
pt.loadFromEnv()
pt.loadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
func (pt *ParamTable) loadFromEnv() {
Params.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) loadFromArgs() {
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("dataCoord.port")
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("dataCoord.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataCoord.grpc.serverMaxSendSize, set to default",
zap.String("dataCoord.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("dataCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("dataCoord.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataCoord.grpc.serverMaxRecvSize, set to default",
zap.String("dataCoord.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("dataCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatacoord
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Port, 0)
t.Logf("DataCoord Port:%d", Params.Port)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
err := Params.Remove("dataCoord.grpc.ServerMaxSendSize")
assert.Nil(t, err)
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
err = Params.Remove("dataCoord.grpc.ServerMaxRecvSize")
assert.Nil(t, err)
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -25,28 +25,29 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/milvus-io/milvus/internal/datacoord"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
// Server is the grpc server of datacoord
type Server struct {
ctx context.Context
......@@ -78,7 +79,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory, opts ...datacoord
}
func (s *Server) init() error {
Params.Init()
Params.InitOnce(typeutil.DataCoordRole)
closer := trace.InitTracing("datacoord")
s.closer = closer
......@@ -86,7 +87,7 @@ func (s *Server) init() error {
datacoord.Params.InitOnce()
datacoord.Params.IP = Params.IP
datacoord.Params.Port = Params.Port
datacoord.Params.Address = Params.Address
datacoord.Params.Address = Params.GetAddress()
err := s.startGrpc()
if err != nil {
......@@ -170,7 +171,7 @@ func (s *Server) start() error {
// Stop stops the DataCoord server gracefully.
// Need to call the GracefulStop interface of grpc server and call the stop method of the inner DataCoord object.
func (s *Server) Stop() error {
log.Debug("Datacoord stop", zap.String("Address", Params.Address))
log.Debug("Datacoord stop", zap.String("Address", Params.GetAddress()))
var err error
if s.closer != nil {
if err = s.closer.Close(); err != nil {
......
......@@ -20,7 +20,6 @@ import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -29,8 +28,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client for DataNode
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -42,7 +45,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
Params.Init()
Params.InitOnce(typeutil.DataNodeRole)
client := &Client{
addr: addr,
grpcClient: &grpcclient.ClientBase{
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatanodeclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("dataNode.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataNode.grpc.clientMaxSendSize, set to default",
zap.String("dataNode.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("dataNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("dataNode.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataNode.grpc.clientMaxRecvSize, set to default",
zap.String("dataNode.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("dataNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatanodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("dataNode.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("dataNode.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatanode
import (
"net"
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
listener net.Listener
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
listener, err := net.Listen("tcp", pt.Address)
if err != nil {
panic(err)
}
pt.listener = listener
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.loadFromEnv()
pt.loadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
func (pt *ParamTable) loadFromArgs() {
}
func (pt *ParamTable) loadFromEnv() {
Params.IP = funcutil.GetLocalIP()
}
func (pt *ParamTable) initPort() {
port := pt.ParseInt("dataNode.port")
pt.Port = port
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("DataNode init", zap.Any("Port", pt.Port))
}
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("dataNode.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse dataNode.grpc.serverMaxSendSize, set to default",
zap.String("dataNode.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("dataNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("dataNode.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // invalid format
log.Warn("Failed to parse dataNode.grpc.serverMaxRecvSize, set to default",
zap.String("dataNode.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("dataNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcdatanode
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
Params.loadFromEnv()
assert.NotEqual(t, Params.IP, "")
t.Logf("DataNode IP:%s", Params.IP)
assert.NotEqual(t, Params.Port, 0)
t.Logf("DataNode Port:%d", Params.Port)
assert.NotNil(t, Params.listener)
t.Logf("DataNode listener:%d", Params.listener)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("dataNode.grpc.serverMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("dataNode.grpc.serverMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -28,6 +28,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
dn "github.com/milvus-io/milvus/internal/datanode"
dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
......@@ -42,10 +43,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
type Server struct {
datanode types.DataNodeComponent
wg sync.WaitGroup
......@@ -88,7 +92,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
func (s *Server) startGrpc() error {
s.wg.Add(1)
go s.startGrpcLoop(Params.listener)
go s.startGrpcLoop(Params.Listener)
// wait for grpc server loop start
err := <-s.grpcErrChan
return err
......@@ -154,7 +158,7 @@ func (s *Server) Run() error {
// Stop stops Datanode's grpc service.
func (s *Server) Stop() error {
log.Debug("Datanode stop", zap.String("Address", Params.Address))
log.Debug("Datanode stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
......@@ -191,7 +195,7 @@ func (s *Server) Stop() error {
// init initializes Datanode's grpc service.
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
Params.InitOnce(typeutil.DataNodeRole)
dn.Params.InitOnce()
dn.Params.Port = Params.Port
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcconfigs
import "math"
const (
// DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side.
DefaultServerMaxSendSize = math.MaxInt32
// DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side.
DefaultServerMaxRecvSize = math.MaxInt32
// DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side.
DefaultClientMaxSendSize = 100 * 1024 * 1024
// DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side.
DefaultClientMaxRecvSize = 100 * 1024 * 1024
)
......@@ -30,10 +30,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client of IndexCoord.
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -48,7 +51,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
log.Debug("IndexCoordClient NewClient failed", zap.Error(err))
return nil, err
}
Params.Init()
Params.InitOnce(typeutil.IndexCoordRole)
client := &Client{
grpcClient: &grpcclient.ClientBase{
ClientMaxRecvSize: Params.ClientMaxRecvSize,
......
......@@ -26,11 +26,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
func TestIndexCoordClient(t *testing.T) {
Params.Init()
Params.InitOnce(typeutil.IndexCoordRole)
ctx := context.Background()
server, err := grpcindexcoord.NewServer(ctx)
assert.Nil(t, err)
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
//
package grpcindexcoordclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is used to record configuration items.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is an alias for ParamTable.
var Params ParamTable
var once sync.Once
// Init is used to initialize configuration items.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("indexCoord.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexCoord.grpc.clientMaxSendSize, set to default",
zap.String("indexCoord.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("indexCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("indexCoord.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexCoord.grpc.clientMaxRecvSize, set to default",
zap.String("indexCoord.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("indexCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexcoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("indexCoord.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("indexCoord.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexcoord
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// ParamTable is used to record configuration items.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Params is an alias for ParamTable.
var Params ParamTable
var once sync.Once
// Init is used to initialize configuration items.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
// initPort initializes the port of IndexCoord service.
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("indexCoord.port")
}
// LoadFromEnv gets the configuration from environment variables.
func (pt *ParamTable) LoadFromEnv() {
Params.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) loadFromArgs() {
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
// initServerMaxSendSize initializes the max send size of IndexCoord service.
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("indexCoord.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexCoord.grpc.serverMaxSendSize, set to default",
zap.String("indexCoord.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("indexCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
// initServerMaxSendSize initializes the max receive size of IndexCoord service.
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("indexCoord.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexCoord.grpc.serverMaxRecvSize, set to default",
zap.String("indexCoord.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("indexCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("indexCoord.grpc.ServerMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("indexCoord.grpc.ServerMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -24,8 +24,6 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
......@@ -37,11 +35,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
// UniqueID is an alias of int64, is used as a unique identifier for the request.
type UniqueID = typeutil.UniqueID
......@@ -75,10 +77,10 @@ func (s *Server) Run() error {
// init initializes IndexCoord's grpc service.
func (s *Server) init() error {
Params.Init()
Params.InitOnce(typeutil.IndexCoordRole)
indexcoord.Params.InitOnce()
indexcoord.Params.Address = Params.Address
indexcoord.Params.Address = Params.GetAddress()
indexcoord.Params.Port = Params.Port
closer := trace.InitTracing("IndexCoord")
......@@ -115,7 +117,7 @@ func (s *Server) start() error {
// Stop stops IndexCoord's grpc service.
func (s *Server) Stop() error {
log.Debug("Indexcoord stop", zap.String("Address", Params.Address))
log.Debug("Indexcoord stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
......
......@@ -26,10 +26,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client of IndexNode.
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
Params.Init()
Params.InitOnce(typeutil.IndexNodeRole)
client := &Client{
addr: addr,
grpcClient: &grpcclient.ClientBase{
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexnodeclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is used to record configuration items.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is an instance of ParamTable.
var Params ParamTable
var once sync.Once
// Init is used to initialize configuration items.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("indexNode.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexNode.grpc.clientMaxSendSize, set to default",
zap.String("indexNode.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("indexNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("indexNode.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexNode.grpc.clientMaxRecvSize, set to default",
zap.String("indexNode.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("indexNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexnodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("indexNode.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("indexNode.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexnode
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is used to record configuration items.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Params is an alias for ParamTable.
var Params ParamTable
var once sync.Once
// Init is used to initialize configuration items.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
})
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
// LoadFromEnv is used to initialize configuration items from env.
func (pt *ParamTable) LoadFromEnv() {
Params.IP = funcutil.GetLocalIP()
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
// initPort initializes the port of IndexNode service.
func (pt *ParamTable) initPort() {
port := pt.ParseInt("indexNode.port")
pt.Port = port
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("IndexNode init", zap.Any("Port", pt.Port))
}
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("indexNode.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexNode.grpc.serverMaxSendSize, set to default",
zap.String("indexNode.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("indexNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("indexNode.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse indexNode.grpc.serverMaxRecvSize, set to default",
zap.String("indexNode.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("indexNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcindexnode
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("indexNode.grpc.ServerMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("indexNode.grpc.ServerMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -25,9 +25,8 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
......@@ -37,11 +36,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
// Server is the grpc wrapper of IndexNode.
type Server struct {
indexnode types.IndexNode
......@@ -73,7 +76,7 @@ func (s *Server) Run() error {
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done()
log.Debug("IndexNode", zap.String("network address", Params.Address), zap.Int("network port: ", grpcPort))
log.Debug("IndexNode", zap.String("network address", Params.GetAddress()), zap.Int("network port: ", grpcPort))
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
if err != nil {
log.Warn("IndexNode", zap.String("GrpcServer:failed to listen", err.Error()))
......@@ -112,18 +115,16 @@ func (s *Server) startGrpcLoop(grpcPort int) {
// init initializes IndexNode's grpc service.
func (s *Server) init() error {
var err error
Params.Init()
Params.InitOnce(typeutil.IndexNodeRole)
indexnode.Params.InitOnce()
indexnode.Params.Port = Params.Port
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address
indexnode.Params.Address = Params.GetAddress()
closer := trace.InitTracing(fmt.Sprintf("IndexNode-%d", indexnode.Params.NodeID))
s.closer = closer
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
defer func() {
if err != nil {
err = s.Stop()
......@@ -168,7 +169,7 @@ func (s *Server) start() error {
// Stop stops IndexNode's grpc service.
func (s *Server) Stop() error {
log.Debug("IndexNode stop", zap.String("Address", Params.Address))
log.Debug("IndexNode stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
......
......@@ -26,10 +26,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client for Proxy
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
Params.Init()
Params.InitOnce(typeutil.ProxyRole)
client := &Client{
addr: addr,
grpcClient: &grpcclient.ClientBase{
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxyclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("proxy.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse proxy.grpc.clientMaxSendSize, set to default",
zap.String("proxy.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("proxy.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("proxy.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse proxy.grpc.clientMaxRecvSize, set to default",
zap.String("proxy.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("proxy.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxyclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("proxy.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("proxy.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
func (pt *ParamTable) loadFromArgs() {
}
func (pt *ParamTable) loadFromEnv() {
pt.IP = funcutil.GetLocalIP()
}
func (pt *ParamTable) initParams() {
pt.loadFromEnv()
pt.loadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
func (pt *ParamTable) initPort() {
port := pt.ParseInt("proxy.port")
pt.Port = port
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("proxy.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse proxy.grpc.serverMaxSendSize, set to default",
zap.String("proxy.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("proxy.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("proxy.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse proxy.grpc.serverMaxRecvSize, set to default",
zap.String("proxy.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("proxy.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("proxy.grpc.serverMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("proxy.grpc.serverMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
Params.loadFromEnv()
assert.Equal(t, Params.IP, funcutil.GetLocalIP())
}
......@@ -43,7 +43,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proxy"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc/keepalive"
)
......@@ -53,6 +55,8 @@ const (
GRPCMaxMagSize = 2 << 30
)
var Params paramtable.GrpcServerConfig
// Server is the Proxy Server
type Server struct {
ctx context.Context
......@@ -150,11 +154,7 @@ func (s *Server) Run() error {
func (s *Server) init() error {
var err error
Params.Init()
if !funcutil.CheckPortAvailable(Params.Port) {
Params.Port = funcutil.GetAvailablePort()
log.Warn("Proxy init", zap.Any("Port", Params.Port))
}
Params.InitOnce(typeutil.ProxyRole)
proxy.Params.InitOnce()
log.Debug("init params done ...")
......@@ -163,14 +163,14 @@ func (s *Server) init() error {
proxy.Params.NetworkPort = Params.Port
proxy.Params.IP = Params.IP
proxy.Params.NetworkAddress = Params.Address
proxy.Params.NetworkAddress = Params.GetAddress()
closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer
log.Debug("proxy", zap.String("proxy host", Params.IP))
log.Debug("proxy", zap.Int("proxy port", Params.Port))
log.Debug("proxy", zap.String("proxy address", Params.Address))
log.Debug("proxy", zap.String("proxy address", Params.GetAddress()))
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
......@@ -272,7 +272,7 @@ func (s *Server) start() error {
// Stop stop the Proxy Server
func (s *Server) Stop() error {
log.Debug("Proxy stop", zap.String("Address", Params.Address))
log.Debug("Proxy stop", zap.String("Address", Params.GetAddress()))
var err error
if s.closer != nil {
if err = s.closer.Close(); err != nil {
......
......@@ -27,12 +27,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client of QueryCoord.
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -47,7 +50,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
log.Debug("QueryCoordClient NewClient failed", zap.Error(err))
return nil, err
}
Params.Init()
Params.InitOnce(typeutil.QueryCoordRole)
client := &Client{
grpcClient: &grpcclient.ClientBase{
ClientMaxRecvSize: Params.ClientMaxRecvSize,
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerycoordclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("queryCoord.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryCoord.grpc.clientMaxSendSize, set to default",
zap.String("queryCoord.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("queryCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("queryCoord.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryCoord.grpc.clientMaxRecvSize, set to default",
zap.String("queryCoord.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("queryCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerycoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("queryCoord.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("queryCoord.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerycoord
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("queryCoord.port")
}
func (pt *ParamTable) LoadFromEnv() {
pt.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("queryCoord.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryCoord.grpc.serverMaxSendSize, set to default",
zap.String("queryCoord.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("queryCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("queryCoord.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryCoord.grpc.serverMaxRecvSize, set to default",
zap.String("queryCoord.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("queryCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerycoord
import (
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("queryCoord.grpc.ServerMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("queryCoord.grpc.ServerMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -40,10 +40,14 @@ import (
qc "github.com/milvus-io/milvus/internal/querycoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc/keepalive"
)
var Params paramtable.GrpcServerConfig
// Server is the grpc server of QueryCoord.
type Server struct {
wg sync.WaitGroup
......@@ -99,10 +103,10 @@ func (s *Server) Run() error {
// init initializes QueryCoord's grpc service.
func (s *Server) init() error {
Params.Init()
Params.InitOnce(typeutil.QueryCoordRole)
qc.Params.InitOnce()
qc.Params.Address = Params.Address
qc.Params.Address = Params.GetAddress()
qc.Params.Port = Params.Port
closer := trace.InitTracing("querycoord")
......@@ -266,7 +270,7 @@ func (s *Server) start() error {
// Stop stops QueryCoord's grpc service.
func (s *Server) Stop() error {
log.Debug("QueryCoord stop", zap.String("Address", Params.Address))
log.Debug("QueryCoord stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
......
......@@ -26,10 +26,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
)
var Params paramtable.GrpcClientConfig
// Client is the grpc client of QueryNode.
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -41,7 +44,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("addr is empty")
}
Params.Init()
Params.InitOnce(typeutil.QueryNodeRole)
client := &Client{
addr: addr,
grpcClient: &grpcclient.ClientBase{
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerynodeclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("queryNode.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryNode.grpc.clientMaxSendSize, set to default",
zap.String("queryNode.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("queryNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("queryNode.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryNode.grpc.clientMaxRecvSize, set to default",
zap.String("queryNode.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("queryNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerynodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("queryNode.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("queryNode.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerynode
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
QueryNodeID UniqueID
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Init is used to initialize configuration items.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
// LoadFromEnv is used to initialize configuration items from env.
func (pt *ParamTable) LoadFromEnv() {
pt.IP = funcutil.GetLocalIP()
}
func (pt *ParamTable) initPort() {
port := pt.ParseInt("queryNode.port")
pt.Port = port
if !funcutil.CheckPortAvailable(pt.Port) {
pt.Port = funcutil.GetAvailablePort()
log.Warn("QueryNode init", zap.Any("Port", pt.Port))
}
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("queryNode.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryNode.grpc.serverMaxSendSize, set to default",
zap.String("queryNode.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("queryNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("queryNode.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse queryNode.grpc.serverMaxRecvSize, set to default",
zap.String("queryNode.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("queryNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcquerynode
import (
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("queryNode.grpc.ServerMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("queryNode.grpc.ServerMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
Params.LoadFromEnv()
assert.Equal(t, Params.IP, funcutil.GetLocalIP())
}
......@@ -25,10 +25,6 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
......@@ -43,11 +39,16 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
qn "github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
// UniqueID is an alias for type typeutil.UniqueID, used as a unique identifier for the request.
type UniqueID = typeutil.UniqueID
......@@ -82,12 +83,12 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
// init initializes QueryNode's grpc service.
func (s *Server) init() error {
Params.Init()
Params.InitOnce(typeutil.QueryNodeRole)
qn.Params.InitOnce()
qn.Params.QueryNodeIP = Params.IP
qn.Params.QueryNodePort = int64(Params.Port)
qn.Params.QueryNodeID = Params.QueryNodeID
//qn.Params.QueryNodeID = Params.QueryNodeID
closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer
......@@ -256,7 +257,7 @@ func (s *Server) Run() error {
// Stop stops QueryNode's grpc service.
func (s *Server) Stop() error {
log.Debug("QueryNode stop", zap.String("Address", Params.Address))
log.Debug("QueryNode stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
......
......@@ -29,12 +29,15 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var Params paramtable.GrpcClientConfig
// Client grpc client
type Client struct {
grpcClient grpcclient.GrpcClient
......@@ -53,7 +56,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
log.Debug("QueryCoordClient NewClient failed", zap.Error(err))
return nil, err
}
Params.Init()
Params.InitOnce(typeutil.RootCoordRole)
client := &Client{
grpcClient: &grpcclient.ClientBase{
ClientMaxRecvSize: Params.ClientMaxRecvSize,
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcrootcoordclient
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
// ParamTable structure stored all parameters
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
// Params rootcoord parameter table
var Params ParamTable
var once sync.Once
// Init initialize param table
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
valueStr, err := pt.Load("rootCoord.grpc.clientMaxSendSize")
if err != nil { // not set
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse rootCoord.grpc.clientMaxSendSize, set to default",
zap.String("rootCoord.grpc.clientMaxSendSize", valueStr),
zap.Error(err))
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
} else {
pt.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.Int("rootCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
valueStr, err := pt.Load("rootCoord.grpc.clientMaxRecvSize")
if err != nil { // not set
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse rootCoord.grpc.clientMaxRecvSize, set to default",
zap.String("rootCoord.grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
} else {
pt.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.Int("rootCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcrootcoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
Params.Remove("rootCoord.grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, grpcconfigs.DefaultClientMaxSendSize)
Params.Remove("rootCoord.grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, grpcconfigs.DefaultClientMaxRecvSize)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcrootcoord
import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
)
// Params is a package scoped variable of type ParamTable.
var Params ParamTable
var once sync.Once
// ParamTable is a derived struct of paramtable.BaseTable. It achieves Composition by
// embedding paramtable.BaseTable. It is used to quickly and easily access the system configuration.
type ParamTable struct {
paramtable.BaseTable
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
// Init is an override method of BaseTable's Init. It mainly calls the
// Init of BaseTable and do some other initialization.
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.Address = pt.IP + ":" + strconv.FormatInt(int64(pt.Port), 10)
})
}
// initParams initializes params of the configuration items.
func (pt *ParamTable) initParams() {
pt.LoadFromEnv()
pt.LoadFromArgs()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
// LoadFromEnv is used to initialize configuration items from env.
func (pt *ParamTable) LoadFromEnv() {
pt.IP = funcutil.GetLocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (pt *ParamTable) LoadFromArgs() {
}
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("rootCoord.port")
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
valueStr, err := pt.Load("rootCoord.grpc.serverMaxSendSize")
if err != nil { // not set
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse rootCoord.grpc.serverMaxSendSize, set to default",
zap.String("rootCoord.grpc.serverMaxSendSize", valueStr),
zap.Error(err))
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
} else {
pt.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.Int("rootCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
valueStr, err := pt.Load("rootCoord.grpc.serverMaxRecvSize")
if err != nil { // not set
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil { // not in valid format
log.Warn("Failed to parse rootCoord.grpc.serverMaxRecvSize, set to default",
zap.String("rootCoord.grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
} else {
pt.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.Int("rootCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcrootcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.Address, "")
t.Logf("master address = %s", Params.Address)
assert.NotEqual(t, Params.Port, 0)
t.Logf("master port = %d", Params.Port)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
Params.Remove("rootCoord.grpc.ServerMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, grpcconfigs.DefaultServerMaxSendSize)
Params.Remove("rootCoord.grpc.ServerMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, grpcconfigs.DefaultServerMaxRecvSize)
}
......@@ -26,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
......@@ -43,11 +44,14 @@ import (
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
var Params paramtable.GrpcServerConfig
// Server grpc wrapper
type Server struct {
rootCoord types.RootCoordComponent
......@@ -141,10 +145,10 @@ func (s *Server) Run() error {
}
func (s *Server) init() error {
Params.Init()
Params.InitOnce(typeutil.RootCoordRole)
rootcoord.Params.InitOnce()
rootcoord.Params.Address = Params.Address
rootcoord.Params.Address = Params.GetAddress()
rootcoord.Params.Port = Params.Port
log.Debug("grpc init done ...")
......@@ -264,7 +268,7 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
log.Debug("Rootcoord stop", zap.String("Address", Params.Address))
log.Debug("Rootcoord stop", zap.String("Address", Params.GetAddress()))
if s.closer != nil {
if err := s.closer.Close(); err != nil {
log.Error("Failed to close opentracing", zap.Error(err))
......
......@@ -22,8 +22,6 @@ import (
"fmt"
"math/rand"
"path"
"strconv"
"strings"
"sync"
"testing"
"time"
......@@ -71,13 +69,9 @@ func TestGrpcService(t *testing.T) {
rand.Seed(time.Now().UnixNano())
randVal := rand.Int()
Params.Init()
Params.InitOnce(typeutil.RootCoordRole)
Params.Port = (randVal % 100) + 10000
parts := strings.Split(Params.Address, ":")
if len(parts) == 2 {
Params.Address = parts[0] + ":" + strconv.Itoa(Params.Port)
t.Log("newParams.Address:", Params.Address)
}
t.Log("newParams.Address:", Params.GetAddress())
ctx := context.Background()
msFactory := msgstream.NewPmsFactory()
......@@ -95,7 +89,7 @@ func TestGrpcService(t *testing.T) {
rootcoord.Params.DefaultPartitionName = "_default"
rootcoord.Params.DefaultIndexName = "_default"
t.Logf("master service port = %d", Params.Port)
t.Logf("service port = %d", Params.Port)
core, ok := (svr.rootCoord).(*rootcoord.Core)
assert.True(t, ok)
......@@ -119,7 +113,7 @@ func TestGrpcService(t *testing.T) {
_, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyRole+"-100"), string(pnb))
assert.Nil(t, err)
rootcoord.Params.Address = Params.Address
rootcoord.Params.Address = Params.GetAddress()
err = core.Init()
assert.Nil(t, err)
......@@ -895,7 +889,7 @@ func TestRun(t *testing.T) {
cancel: cancel,
grpcErrChan: make(chan error),
}
Params.Init()
Params.InitOnce(typeutil.RootCoordRole)
Params.Port = 1000000
err := svr.Run()
assert.NotNil(t, err)
......
......@@ -17,9 +17,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"
"github.com/go-basic/ipv4"
......@@ -41,27 +39,6 @@ func CheckGrpcReady(ctx context.Context, targetCh chan error) {
}
}
// CheckPortAvailable check if a port is available to be listened on
func CheckPortAvailable(port int) bool {
addr := ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", addr)
if listener != nil {
listener.Close()
}
return err == nil
}
// GetAvailablePort return an available port that can be listened on
func GetAvailablePort() int {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}
// GetLocalIP return the local ip address
func GetLocalIP() string {
return ipv4.LocalIP()
......
......@@ -99,15 +99,6 @@ func Test_CheckGrpcReady(t *testing.T) {
cancel()
}
func Test_CheckPortAvailable(t *testing.T) {
num := 10
for i := 0; i < num; i++ {
port := GetAvailablePort()
assert.Equal(t, CheckPortAvailable(port), true)
}
}
func Test_GetLocalIP(t *testing.T) {
ip := GetLocalIP()
assert.NotNil(t, ip)
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package paramtable
import (
"math"
"net"
"strconv"
"sync"
"go.uber.org/zap"
"github.com/go-basic/ipv4"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// DefaultServerMaxSendSize defines the maximum size of data per grpc request can send by server side.
DefaultServerMaxSendSize = math.MaxInt32
// DefaultServerMaxRecvSize defines the maximum size of data per grpc request can receive by server side.
DefaultServerMaxRecvSize = math.MaxInt32
// DefaultClientMaxSendSize defines the maximum size of data per grpc request can send by client side.
DefaultClientMaxSendSize = 100 * 1024 * 1024
// DefaultClientMaxRecvSize defines the maximum size of data per grpc request can receive by client side.
DefaultClientMaxRecvSize = 100 * 1024 * 1024
)
///////////////////////////////////////////////////////////////////////////////
// -- grpc ---
type grpcConfig struct {
BaseParamTable
once sync.Once
Domain string
IP string
Port int
Listener net.Listener
}
func (p *grpcConfig) init(domain string) {
p.BaseParamTable.Init()
p.Domain = domain
p.LoadFromEnv()
p.LoadFromArgs()
p.initPort()
p.initListener()
}
// LoadFromEnv is used to initialize configuration items from env.
func (p *grpcConfig) LoadFromEnv() {
p.IP = ipv4.LocalIP()
}
// LoadFromArgs is used to initialize configuration items from args.
func (p *grpcConfig) LoadFromArgs() {
}
func (p *grpcConfig) initPort() {
p.Port = p.ParseInt(p.Domain + ".port")
if p.Domain == typeutil.ProxyRole || p.Domain == typeutil.DataNodeRole || p.Domain == typeutil.IndexNodeRole || p.Domain == typeutil.QueryNodeRole {
if !CheckPortAvailable(p.Port) {
p.Port = GetAvailablePort()
log.Warn("get available port when init", zap.String("Domain", p.Domain), zap.Int("Port", p.Port))
}
}
}
// GetAddress return grpc address
func (p *grpcConfig) GetAddress() string {
return p.IP + ":" + strconv.Itoa(p.Port)
}
func (p *grpcConfig) initListener() {
if p.Domain == typeutil.DataNodeRole {
listener, err := net.Listen("tcp", p.GetAddress())
if err != nil {
panic(err)
}
p.Listener = listener
}
}
type GrpcServerConfig struct {
grpcConfig
ServerMaxSendSize int
ServerMaxRecvSize int
}
// InitOnce initialize grpc server config once
func (p *GrpcServerConfig) InitOnce(domain string) {
p.once.Do(func() {
p.init(domain)
})
}
func (p *GrpcServerConfig) init(domain string) {
p.grpcConfig.init(domain)
p.initServerMaxSendSize()
p.initServerMaxRecvSize()
}
func (p *GrpcServerConfig) initServerMaxSendSize() {
var err error
valueStr, err := p.Load(p.Domain + ".grpc.serverMaxSendSize")
if err != nil {
p.ServerMaxSendSize = DefaultServerMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxSendSize, set to default",
zap.String("rol", p.Domain), zap.String("grpc.serverMaxSendSize", valueStr),
zap.Error(err))
p.ServerMaxSendSize = DefaultServerMaxSendSize
} else {
p.ServerMaxSendSize = value
}
log.Debug("initServerMaxSendSize",
zap.String("role", p.Domain), zap.Int("grpc.serverMaxSendSize", p.ServerMaxSendSize))
}
func (p *GrpcServerConfig) initServerMaxRecvSize() {
var err error
valueStr, err := p.Load(p.Domain + ".grpc.serverMaxRecvSize")
if err != nil {
p.ServerMaxRecvSize = DefaultServerMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.serverMaxRecvSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.serverMaxRecvSize", valueStr),
zap.Error(err))
p.ServerMaxRecvSize = DefaultServerMaxRecvSize
} else {
p.ServerMaxRecvSize = value
}
log.Debug("initServerMaxRecvSize",
zap.String("role", p.Domain), zap.Int("grpc.serverMaxRecvSize", p.ServerMaxRecvSize))
}
type GrpcClientConfig struct {
grpcConfig
ClientMaxSendSize int
ClientMaxRecvSize int
}
// InitOnce initialize grpc client config once
func (p *GrpcClientConfig) InitOnce(domain string) {
p.once.Do(func() {
p.init(domain)
})
}
func (p *GrpcClientConfig) init(domain string) {
p.grpcConfig.init(domain)
p.initClientMaxSendSize()
p.initClientMaxRecvSize()
}
func (p *GrpcClientConfig) initClientMaxSendSize() {
var err error
valueStr, err := p.Load(p.Domain + ".grpc.clientMaxSendSize")
if err != nil {
p.ClientMaxSendSize = DefaultClientMaxSendSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.clientMaxSendSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.clientMaxSendSize", valueStr),
zap.Error(err))
p.ClientMaxSendSize = DefaultClientMaxSendSize
} else {
p.ClientMaxSendSize = value
}
log.Debug("initClientMaxSendSize",
zap.String("role", p.Domain), zap.Int("grpc.clientMaxSendSize", p.ClientMaxSendSize))
}
func (p *GrpcClientConfig) initClientMaxRecvSize() {
var err error
valueStr, err := p.Load(p.Domain + ".grpc.clientMaxRecvSize")
if err != nil {
p.ClientMaxRecvSize = DefaultClientMaxRecvSize
}
value, err := strconv.Atoi(valueStr)
if err != nil {
log.Warn("Failed to parse grpc.clientMaxRecvSize, set to default",
zap.String("role", p.Domain), zap.String("grpc.clientMaxRecvSize", valueStr),
zap.Error(err))
p.ClientMaxRecvSize = DefaultClientMaxRecvSize
} else {
p.ClientMaxRecvSize = value
}
log.Debug("initClientMaxRecvSize",
zap.String("role", p.Domain), zap.Int("grpc.clientMaxRecvSize", p.ClientMaxRecvSize))
}
// CheckPortAvailable check if a port is available to be listened on
func CheckPortAvailable(port int) bool {
addr := ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", addr)
if listener != nil {
listener.Close()
}
return err == nil
}
// GetAvailablePort return an available port that can be listened on
func GetAvailablePort() int {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package paramtable
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func TestGrpcServerParams(t *testing.T) {
role := typeutil.DataNodeRole
var Params GrpcServerConfig
Params.InitOnce(role)
assert.Equal(t, Params.Domain, role)
t.Logf("Domain = %s", Params.Domain)
assert.NotEqual(t, Params.IP, "")
t.Logf("IP = %s", Params.IP)
assert.NotZero(t, Params.Port)
t.Logf("Port = %d", Params.Port)
t.Logf("Address = %s", Params.GetAddress())
assert.NotNil(t, Params.Listener)
t.Logf("Listener = %d", Params.Listener)
assert.NotZero(t, Params.ServerMaxRecvSize)
t.Logf("ServerMaxRecvSize = %d", Params.ServerMaxRecvSize)
Params.Remove(role + ".grpc.serverMaxRecvSize")
Params.initServerMaxRecvSize()
assert.Equal(t, Params.ServerMaxRecvSize, DefaultServerMaxRecvSize)
assert.NotZero(t, Params.ServerMaxSendSize)
t.Logf("ServerMaxSendSize = %d", Params.ServerMaxSendSize)
Params.Remove(role + ".grpc.serverMaxSendSize")
Params.initServerMaxSendSize()
assert.Equal(t, Params.ServerMaxSendSize, DefaultServerMaxSendSize)
}
func TestGrpcClientParams(t *testing.T) {
role := typeutil.DataNodeRole
var Params GrpcClientConfig
Params.InitOnce(role)
assert.Equal(t, Params.Domain, role)
t.Logf("Domain = %s", Params.Domain)
assert.NotEqual(t, Params.IP, "")
t.Logf("IP = %s", Params.IP)
assert.NotZero(t, Params.Port)
t.Logf("Port = %d", Params.Port)
t.Logf("Address = %s", Params.GetAddress())
assert.NotNil(t, Params.Listener)
t.Logf("Listener = %d", Params.Listener)
assert.NotZero(t, Params.ClientMaxRecvSize)
t.Logf("ClientMaxRecvSize = %d", Params.ClientMaxRecvSize)
Params.Remove(role + ".grpc.clientMaxRecvSize")
Params.initClientMaxRecvSize()
assert.Equal(t, Params.ClientMaxRecvSize, DefaultClientMaxRecvSize)
assert.NotZero(t, Params.ClientMaxSendSize)
t.Logf("ClientMaxSendSize = %d", Params.ClientMaxSendSize)
Params.Remove(role + ".grpc.clientMaxSendSize")
Params.initClientMaxSendSize()
assert.Equal(t, Params.ClientMaxSendSize, DefaultClientMaxSendSize)
}
func TestCheckPortAvailable(t *testing.T) {
num := 10
for i := 0; i < num; i++ {
port := GetAvailablePort()
assert.Equal(t, CheckPortAvailable(port), true)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册