diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b29a1d437f0f6ff5dea000b6e2e294a215744046..092d5dda90bc4bb99efba5beb62462eee300dca4 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -41,31 +41,79 @@ rootCoord: address: localhost port: 53100 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + proxy: port: 19530 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + queryCoord: address: localhost port: 19531 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + queryNode: gracefulTime: 1000 # ms, for search port: 21123 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + indexCoord: address: localhost port: 31000 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + indexNode: port: 21121 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + dataCoord: address: localhost port: 13333 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + dataNode: port: 21124 + grpc: + serverMaxRecvSize: 2147483647 # math.MaxInt32 + serverMaxSendSize: 2147483647 # math.MaxInt32 + clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024 + clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024 + storage: path: /var/lib/milvus/data/ diff --git a/go.mod b/go.mod index 424a1aa12ab68861d19a3c2a870a280067f47d42..9c1eea7c8106f47f59296d74b0581c5ea3e8a8ae 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/jarcoal/httpmock v1.0.8 github.com/klauspost/compress v1.10.11 // indirect + github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 github.com/minio/minio-go/v7 v7.0.10 github.com/mitchellh/mapstructure v1.1.2 github.com/opentracing/opentracing-go v1.2.0 diff --git a/go.sum b/go.sum index 97790bbb788e5f676e7574ce505608988f4d1744..55294c67f60586efc9d07d40b7d3190094f1aaf8 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76 h1:IVlcvV0CjvfBYYod5ePe89l+3LBAl//6n9kJ9Vr2i0k= +github.com/lingdor/stackerror v0.0.0-20191119040541-976d8885ed76/go.mod h1:Iu9BHUvTh8/KpbuSoKx/CaJEdJvFxSverxIy7I+nq7s= github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg= github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index e23276c8e8deb066646fb4a052978bcd3b3b01e1..783e93e82aecdda63627a3aef744b26a8692c6f8 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -77,6 +77,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -92,6 +93,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/datacoord/client/paramtable.go b/internal/distributed/datacoord/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..b7a583fae98d11fa1cc8f3443bfccadb3e26fbe3 --- /dev/null +++ b/internal/distributed/datacoord/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdatacoordclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("dataCoord.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("dataCoord.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("dataCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("dataCoord.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("dataCoord.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("dataCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/datacoord/client/paramtable_test.go b/internal/distributed/datacoord/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f3e131fdd4cf46734480e7705d696e47d95747c8 --- /dev/null +++ b/internal/distributed/datacoord/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdatacoordclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/datacoord/paramtable.go b/internal/distributed/datacoord/paramtable.go index bba5b4fd724c456340e931aad399e56e578c7b4a..6398f5301c774613290322a70108f779279ed74a 100644 --- a/internal/distributed/datacoord/paramtable.go +++ b/internal/distributed/datacoord/paramtable.go @@ -14,6 +14,10 @@ package grpcdatacoordclient import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -23,6 +27,9 @@ type ParamTable struct { IP string Port int RootCoordAddress string + + ServerMaxSendSize int + ServerMaxRecvSize int } var Params ParamTable @@ -34,6 +41,9 @@ func (pt *ParamTable) Init() { pt.initPort() pt.initParams() pt.LoadFromEnv() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -65,3 +75,25 @@ func (pt *ParamTable) initDataCoordAddress() { } pt.IP = ret } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("dataCoord.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("dataCoord.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("dataCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("dataCoord.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("dataCoord.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("dataCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/datacoord/paramtable_test.go b/internal/distributed/datacoord/paramtable_test.go index e769605bf83878ef96424bc05e77f22c9cd1a205..c093d96f0d5c7a1e47d776287127f0a300c220da 100644 --- a/internal/distributed/datacoord/paramtable_test.go +++ b/internal/distributed/datacoord/paramtable_test.go @@ -14,6 +14,9 @@ package grpcdatacoordclient import ( "testing" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" ) @@ -25,4 +28,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.RootCoordAddress, "") t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) } diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 2503eac673091123f0f122a991cf2875d46c7d4f..80d672e8c2a3ee296bf2844ea6137c03d690e261 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -14,7 +14,6 @@ package grpcdatacoordclient import ( "context" "io" - "math" "net" "strconv" "sync" @@ -124,8 +123,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor( grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 3b277bcc8c393894b3f8d77a0307890ede721dfb..90ba2a35420ac65b08ba4a0cbfac4edf162eb943 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -61,6 +61,7 @@ func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) ( } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -70,6 +71,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("DataNode connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithDisableRetry(), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( diff --git a/internal/distributed/datanode/client/paramtable.go b/internal/distributed/datanode/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..363ec16815f214e6bb0e30ac3dfa16f2a0fd88a9 --- /dev/null +++ b/internal/distributed/datanode/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdatanodeclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("dataNode.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("dataNode.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("dataNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("dataNode.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("dataNode.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("dataNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/datanode/client/paramtable_test.go b/internal/distributed/datanode/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..31e5c213a532f2cd1c2cb69591dbfd968350b054 --- /dev/null +++ b/internal/distributed/datanode/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcdatanodeclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/datanode/param_table.go b/internal/distributed/datanode/param_table.go index 3468d3bbc4cec840a1da804f7878489541fcc27c..6a036edf472d6ab6a9e49d8cbde7bc2007373adb 100644 --- a/internal/distributed/datanode/param_table.go +++ b/internal/distributed/datanode/param_table.go @@ -15,6 +15,8 @@ import ( "net" "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -33,6 +35,9 @@ type ParamTable struct { RootCoordAddress string DataCoordAddress string + + ServerMaxSendSize int + ServerMaxRecvSize int } func (pt *ParamTable) Init() { @@ -41,6 +46,9 @@ func (pt *ParamTable) Init() { pt.initRootCoordAddress() pt.initDataCoordAddress() pt.initPort() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -79,3 +87,25 @@ func (pt *ParamTable) initDataCoordAddress() { } pt.DataCoordAddress = ret } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("dataNode.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("dataNode.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("dataNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("dataNode.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("dataNode.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("dataNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/datanode/param_table_test.go b/internal/distributed/datanode/param_table_test.go index e68223e40650cd049b76daef554dc837bb7a4a43..6d46974a1cfd1b10ea2971d6ccf64fea106dbb64 100644 --- a/internal/distributed/datanode/param_table_test.go +++ b/internal/distributed/datanode/param_table_test.go @@ -14,6 +14,9 @@ package grpcdatanode import ( "testing" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" ) @@ -35,4 +38,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.RootCoordAddress, "") t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 7c13b3c173a33ae779c6f479380b1b4e4dc2c651..0d493cb122a4c6736c5b3553246ab27308ae5229 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -16,7 +16,6 @@ import ( "errors" "fmt" "io" - "math" "net" "strconv" "sync" @@ -94,8 +93,8 @@ func (s *Server) startGrpcLoop(listener net.Listener) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor( grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( diff --git a/internal/distributed/grpcconfigs/configs.go b/internal/distributed/grpcconfigs/configs.go new file mode 100644 index 0000000000000000000000000000000000000000..9d36e24f6da9c29f71b03303c69d8bb88da0f6d4 --- /dev/null +++ b/internal/distributed/grpcconfigs/configs.go @@ -0,0 +1,10 @@ +package grpcconfigs + +import "math" + +const ( + DefaultServerMaxSendSize = math.MaxInt32 + DefaultServerMaxRecvSize = math.MaxInt32 + DefaultClientMaxSendSize = 100 * 1024 * 1024 + DefaultClientMaxRecvSize = 100 * 1024 * 1024 +) diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index c0d606c7973a9a347ad93058aa8866b3c4e6361b..a0c57bd1a078b94ac147281d4336fcf69fcc520a 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -78,6 +78,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -93,6 +94,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(3), grpc_retry.WithPerRetryTimeout(time.Second*3)), diff --git a/internal/distributed/indexcoord/client/paramtable.go b/internal/distributed/indexcoord/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..cfd3a73904233d84d8da62479fb4e2350bac9a30 --- /dev/null +++ b/internal/distributed/indexcoord/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexcoordclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("indexCoord.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("indexCoord.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("indexCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("indexCoord.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("indexCoord.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("indexCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/indexcoord/client/paramtable_test.go b/internal/distributed/indexcoord/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..96ec408e997982d22e4c3b41fc0407c4da6675cf --- /dev/null +++ b/internal/distributed/indexcoord/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexcoordclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/indexcoord/paramtable.go b/internal/distributed/indexcoord/paramtable.go index 8831ce7335ef841fa731f66a243cd2519ee05659..fc8b58a27ba421eb16c88c010ed60bdacddfca25 100644 --- a/internal/distributed/indexcoord/paramtable.go +++ b/internal/distributed/indexcoord/paramtable.go @@ -14,6 +14,10 @@ package grpcindexcoord import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -22,6 +26,9 @@ type ParamTable struct { ServiceAddress string ServicePort int + + ServerMaxSendSize int + ServerMaxRecvSize int } var Params ParamTable @@ -37,6 +44,9 @@ func (pt *ParamTable) Init() { func (pt *ParamTable) initParams() { pt.initServicePort() pt.initServiceAddress() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() } func (pt *ParamTable) initServicePort() { @@ -50,3 +60,25 @@ func (pt *ParamTable) initServiceAddress() { } pt.ServiceAddress = ret } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("indexCoord.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("indexCoord.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("indexCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("indexCoord.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("indexCoord.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("indexCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/indexcoord/paramtable_test.go b/internal/distributed/indexcoord/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ea97dd1fae78016871a2a69fa39174fd3e129a0d --- /dev/null +++ b/internal/distributed/indexcoord/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) +} diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index e726284e4072c9e20ec9f13975380c3cb5de9cf9..890da8a208bc911bb089bb2cf527ab53c86f78f1 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -14,7 +14,6 @@ package grpcindexcoord import ( "context" "io" - "math" "net" "strconv" "sync" @@ -161,8 +160,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor(ot.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...))) indexpb.RegisterIndexCoordServer(s.grpcServer, s) diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 9f4dc7ee66e4ac80b6638c286a46808a5f22e4ed..35afe93c9a1480b7ac357345d879a3f6df5da219 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -57,6 +57,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -66,6 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/indexnode/client/paramtable.go b/internal/distributed/indexnode/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..d02691ffd5b0dd74a0a8564e898fb75d0bf31776 --- /dev/null +++ b/internal/distributed/indexnode/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexnodeclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("indexNode.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("indexNode.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("indexNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("indexNode.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("indexNode.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("indexNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/indexnode/client/paramtable_test.go b/internal/distributed/indexnode/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..547a2e498c44b6612d2e68a8b4285eac20d47c05 --- /dev/null +++ b/internal/distributed/indexnode/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexnodeclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/indexnode/paramtable.go b/internal/distributed/indexnode/paramtable.go index 9bc4de9699892871d4895c0b544b9f883030a934..f68b2fe1a40ad24924c327fb7261d33881e3ea12 100644 --- a/internal/distributed/indexnode/paramtable.go +++ b/internal/distributed/indexnode/paramtable.go @@ -14,6 +14,10 @@ package grpcindexnode import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -26,6 +30,9 @@ type ParamTable struct { IP string Port int Address string + + ServerMaxSendSize int + ServerMaxRecvSize int } var Params ParamTable @@ -35,6 +42,9 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initParams() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -64,3 +74,25 @@ func (pt *ParamTable) initPort() { port := pt.ParseInt("indexNode.port") pt.Port = port } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("indexNode.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("indexNode.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("indexNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("indexNode.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("indexNode.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("indexNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/indexnode/paramtable_test.go b/internal/distributed/indexnode/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a84b532540b3b5b54f297e28a628a1b856fa6b3b --- /dev/null +++ b/internal/distributed/indexnode/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcindexnode + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) +} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 143966440f86237513dd66596c3f39929e235928..f0236ec54390e711b1a97e5ca6a0c9c0c309ff7f 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "io" - "math" "net" "strconv" "sync" @@ -79,8 +78,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor(grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor(grpc_opentracing.StreamServerInterceptor(opts...))) indexpb.RegisterIndexNodeServer(s.grpcServer, s) diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 2508d8614eae484677fd1561f27c47719e833578..b68122ccea2b4c8c3dff2c87a84c3a86a685208f 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -56,6 +56,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -65,6 +66,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("ProxyClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/proxy/client/paramtable.go b/internal/distributed/proxy/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..1c89f1fdedca76dce97decc856f35266b72da9f7 --- /dev/null +++ b/internal/distributed/proxy/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcproxyclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("proxy.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("proxy.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("proxy.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("proxy.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("proxy.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("proxy.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/proxy/client/paramtable_test.go b/internal/distributed/proxy/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..6001b94fdcc45f493996586f65694d86220cec9a --- /dev/null +++ b/internal/distributed/proxy/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcproxyclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/proxy/paramtable.go b/internal/distributed/proxy/paramtable.go index d5bc896f77b37078e5277f052b43036ff30a5484..c9b7fc8b6c1b4aa8936de54303273c6ec7575b39 100644 --- a/internal/distributed/proxy/paramtable.go +++ b/internal/distributed/proxy/paramtable.go @@ -14,6 +14,10 @@ package grpcproxy import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -29,6 +33,9 @@ type ParamTable struct { IP string Port int Address string + + ServerMaxSendSize int + ServerMaxRecvSize int } var Params ParamTable @@ -38,6 +45,9 @@ func (pt *ParamTable) Init() { once.Do(func() { pt.BaseTable.Init() pt.initParams() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -97,3 +107,25 @@ func (pt *ParamTable) initPort() { port := pt.ParseInt("proxy.port") pt.Port = port } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("proxy.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("proxy.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("proxy.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("proxy.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("proxy.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("proxy.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/proxy/paramtable_test.go b/internal/distributed/proxy/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..da5595aa9739fc76926e7e2da66eb6fbb3cb7079 --- /dev/null +++ b/internal/distributed/proxy/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcproxy + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) +} diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 5f18d4a1cff5afe6a9c8dc679f21acc0d2379bf9..2ac384ae5441b19731e0221197d91cff94608739 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "io" - "math" "net" "strconv" "sync" @@ -95,8 +94,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.MaxRecvMsgSize(GRPCMaxMagSize), grpc.UnaryInterceptor( grpc_opentracing.UnaryServerInterceptor(opts...)), diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index fbf0483eaf3b19bdcd1bc090fdaf444db9373615..b616fbfc9f35cde393fcd27af0c9374bd4f9eba3 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -78,6 +78,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -93,6 +94,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/querycoord/client/paramtable.go b/internal/distributed/querycoord/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..4c6d3b052eb4ecc3784c4b34cb30c2349bbce196 --- /dev/null +++ b/internal/distributed/querycoord/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcquerycoordclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("queryCoord.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("queryCoord.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("queryCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("queryCoord.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("queryCoord.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("queryCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/querycoord/client/paramtable_test.go b/internal/distributed/querycoord/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e9d32c1f727f638d2f0131eeb7c9a0b53a6c5623 --- /dev/null +++ b/internal/distributed/querycoord/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcquerycoordclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/querycoord/param_table.go b/internal/distributed/querycoord/param_table.go index 5dde4ee98db3b8711748b8fb76d90d92f37e2767..e3016b710cb5a4b996483a82a42bb77ef801fbe4 100644 --- a/internal/distributed/querycoord/param_table.go +++ b/internal/distributed/querycoord/param_table.go @@ -14,6 +14,10 @@ package grpcquerycoord import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -24,9 +28,11 @@ type ParamTable struct { paramtable.BaseTable Port int - IndexCoordAddress string - RootCoordAddress string - DataCoordAddress string + RootCoordAddress string + DataCoordAddress string + + ServerMaxSendSize int + ServerMaxRecvSize int } func (pt *ParamTable) Init() { @@ -34,8 +40,10 @@ func (pt *ParamTable) Init() { pt.BaseTable.Init() pt.initPort() pt.initRootCoordAddress() - pt.initIndexCoordAddress() pt.initDataCoordAddress() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -47,14 +55,6 @@ func (pt *ParamTable) initRootCoordAddress() { pt.RootCoordAddress = ret } -func (pt *ParamTable) initIndexCoordAddress() { - ret, err := pt.Load("IndexCoordAddress") - if err != nil { - panic(err) - } - pt.IndexCoordAddress = ret -} - func (pt *ParamTable) initDataCoordAddress() { ret, err := pt.Load("_DataCoordAddress") if err != nil { @@ -66,3 +66,25 @@ func (pt *ParamTable) initDataCoordAddress() { func (pt *ParamTable) initPort() { pt.Port = pt.ParseInt("queryCoord.port") } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("queryCoord.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("queryCoord.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("queryCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("queryCoord.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("queryCoord.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("queryCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/querycoord/param_table_test.go b/internal/distributed/querycoord/param_table_test.go index bb3fecbd85a44628e55a63af946fb5df4507efe0..250b4a6015cdf419228bbd6210eb902a5cf09061 100644 --- a/internal/distributed/querycoord/param_table_test.go +++ b/internal/distributed/querycoord/param_table_test.go @@ -11,23 +11,24 @@ package grpcquerycoord -/* import ( "testing" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" "github.com/stretchr/testify/assert" ) func TestParamTable(t *testing.T) { Params.Init() - assert.NotEqual(t, Params.IndexCoordAddress, "") - t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress) - assert.NotEqual(t, Params.DataCoordAddress, "") t.Logf("DataCoordAddress:%s", Params.DataCoordAddress) assert.NotEqual(t, Params.RootCoordAddress, "") t.Logf("RootCoordAddress:%s", Params.RootCoordAddress) + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) } -*/ diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 9ded1300c47e6d37bb443818de94ccbf1c30996b..6cc5fc67d58d3237a6f71266af06c0aff01d1d92 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -14,7 +14,6 @@ package grpcquerycoord import ( "context" "io" - "math" "net" "strconv" "sync" @@ -187,8 +186,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor( grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index f5b4b0d0059db64f474f7e70a0d20f8797ba7d03..9ccdefe56f72b6718d1658e9b5a93b390559431a 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -57,6 +57,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { } func (c *Client) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -66,6 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error { log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/querynode/client/paramtable.go b/internal/distributed/querynode/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..934d9c5bb1e5381fa553428e89860f779cdb094b --- /dev/null +++ b/internal/distributed/querynode/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcquerynodeclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("queryNode.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("queryNode.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("queryNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("queryNode.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("queryNode.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("queryNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/querynode/client/paramtable_test.go b/internal/distributed/querynode/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4574f55fa407460644d30d48d9bd38331c69306d --- /dev/null +++ b/internal/distributed/querynode/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcquerynodeclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/querynode/param_table.go b/internal/distributed/querynode/param_table.go index 3b501ad0c410f195039ee1f6e8d53e15fc6a1219..96f29c7eac1ee1f8e63ea85289e3a3970a4310c7 100644 --- a/internal/distributed/querynode/param_table.go +++ b/internal/distributed/querynode/param_table.go @@ -14,6 +14,10 @@ package grpcquerynode import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -32,6 +36,9 @@ type ParamTable struct { IndexCoordAddress string DataCoordAddress string QueryCoordAddress string + + ServerMaxSendSize int + ServerMaxRecvSize int } func (pt *ParamTable) Init() { @@ -42,6 +49,9 @@ func (pt *ParamTable) Init() { pt.initIndexCoordAddress() pt.initDataCoordAddress() pt.initQueryCoordAddress() + + pt.initServerMaxSendSize() + pt.initServerMaxRecvSize() }) } @@ -89,3 +99,25 @@ func (pt *ParamTable) initPort() { port := pt.ParseInt("queryNode.port") pt.QueryNodePort = port } + +func (pt *ParamTable) initServerMaxSendSize() { + var err error + pt.ServerMaxSendSize, err = pt.ParseIntWithErr("queryNode.grpc.serverMaxSendSize") + if err != nil { + pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("queryNode.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("queryNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize)) +} + +func (pt *ParamTable) initServerMaxRecvSize() { + var err error + pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("queryNode.grpc.serverMaxRecvSize") + if err != nil { + pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("queryNode.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("queryNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize)) +} diff --git a/internal/distributed/querynode/param_table_test.go b/internal/distributed/querynode/param_table_test.go index d3a4cc82013587bb8757e55563b1b88a06ec109d..b1932d3d9483664084a943db7ab0a22abd00137a 100644 --- a/internal/distributed/querynode/param_table_test.go +++ b/internal/distributed/querynode/param_table_test.go @@ -14,6 +14,9 @@ package grpcquerynode import ( "testing" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" "github.com/stretchr/testify/assert" ) @@ -31,4 +34,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.QueryCoordAddress, "") t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress) + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 645b7d74b33df45666923c1e423e4ebd9519c5f6..cfec20eadcb04c23a28ff07859eae491b76a8394 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "io" - "math" "net" "strconv" "sync" @@ -200,8 +199,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor( grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor( diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 88d8fd2116c7bae94182ecc1cc51e550dbe36724..0afce2c3076b73faff483845c1084d9fe46b4ad2 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -86,6 +86,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*G } func (c *GrpcClient) Init() error { + Params.Init() return c.connect(retry.Attempts(20)) } @@ -101,6 +102,9 @@ func (c *GrpcClient) connect(retryOptions ...retry.Option) error { log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr)) conn, err := grpc.DialContext(c.ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize), + grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)), grpc.WithUnaryInterceptor( grpc_middleware.ChainUnaryClient( grpc_retry.UnaryClientInterceptor( diff --git a/internal/distributed/rootcoord/client/paramtable.go b/internal/distributed/rootcoord/client/paramtable.go new file mode 100644 index 0000000000000000000000000000000000000000..37cca304344e2648e1d71c9125d063dfbad1221e --- /dev/null +++ b/internal/distributed/rootcoord/client/paramtable.go @@ -0,0 +1,63 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcrootcoordclient + +import ( + "sync" + + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable + + ClientMaxSendSize int + ClientMaxRecvSize int +} + +var Params ParamTable +var once sync.Once + +func (pt *ParamTable) Init() { + once.Do(func() { + pt.BaseTable.Init() + + pt.initClientMaxSendSize() + pt.initClientMaxRecvSize() + }) +} + +func (pt *ParamTable) initClientMaxSendSize() { + var err error + pt.ClientMaxSendSize, err = pt.ParseIntWithErr("rootCoord.grpc.clientMaxSendSize") + if err != nil { + pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize + log.Debug("rootCoord.grpc.clientMaxSendSize not set, set to default") + } + log.Debug("initClientMaxSendSize", + zap.Int("rootCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize)) +} + +func (pt *ParamTable) initClientMaxRecvSize() { + var err error + pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("rootCoord.grpc.clientMaxRecvSize") + if err != nil { + pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize + log.Debug("rootCoord.grpc.clientMaxRecvSize not set, set to default") + } + log.Debug("initClientMaxRecvSize", + zap.Int("rootCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize)) +} diff --git a/internal/distributed/rootcoord/client/paramtable_test.go b/internal/distributed/rootcoord/client/paramtable_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b4f0900e8305f8679ffc3c97634a33f5f609427a --- /dev/null +++ b/internal/distributed/rootcoord/client/paramtable_test.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package grpcrootcoordclient + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestParamTable(t *testing.T) { + Params.Init() + + log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize)) + log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize)) +} diff --git a/internal/distributed/rootcoord/param_table.go b/internal/distributed/rootcoord/param_table.go index e4c3370c38e5f9de2bb51a6b0cc2721913cfa1ad..69ad64d449d28f9c44d16ebf3230034f49fa0040 100644 --- a/internal/distributed/rootcoord/param_table.go +++ b/internal/distributed/rootcoord/param_table.go @@ -14,6 +14,10 @@ package grpcrootcoord import ( "sync" + "github.com/milvus-io/milvus/internal/distributed/grpcconfigs" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -29,6 +33,9 @@ type ParamTable struct { IndexCoordAddress string QueryCoordAddress string DataCoordAddress string + + ServerMaxSendSize int + ServerMaxRecvSize int } func (p *ParamTable) Init() { @@ -43,6 +50,9 @@ func (p *ParamTable) Init() { p.initIndexCoordAddress() p.initQueryCoordAddress() p.initDataCoordAddress() + + p.initServerMaxSendSize() + p.initServerMaxRecvSize() }) } @@ -81,3 +91,25 @@ func (p *ParamTable) initDataCoordAddress() { } p.DataCoordAddress = ret } + +func (p *ParamTable) initServerMaxSendSize() { + var err error + p.ServerMaxSendSize, err = p.ParseIntWithErr("rootCoord.grpc.serverMaxSendSize") + if err != nil { + p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize + log.Debug("rootCoord.grpc.serverMaxSendSize not set, set to default") + } + log.Debug("initServerMaxSendSize", + zap.Int("rootCoord.grpc.serverMaxSendSize", p.ServerMaxSendSize)) +} + +func (p *ParamTable) initServerMaxRecvSize() { + var err error + p.ServerMaxRecvSize, err = p.ParseIntWithErr("rootCoord.grpc.serverMaxRecvSize") + if err != nil { + p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize + log.Debug("rootCoord.grpc.serverMaxRecvSize not set, set to default") + } + log.Debug("initServerMaxRecvSize", + zap.Int("rootCoord.grpc.serverMaxRecvSize", p.ServerMaxRecvSize)) +} diff --git a/internal/distributed/rootcoord/param_table_test.go b/internal/distributed/rootcoord/param_table_test.go index c4c739d96017f6232fac80600031662aa00b816b..c340ea54bc84c2876d4926450e46bbc70394b6ae 100644 --- a/internal/distributed/rootcoord/param_table_test.go +++ b/internal/distributed/rootcoord/param_table_test.go @@ -14,6 +14,9 @@ package grpcrootcoord import ( "testing" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" ) @@ -34,4 +37,7 @@ func TestParamTable(t *testing.T) { assert.NotEqual(t, Params.QueryCoordAddress, "") t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress) + + log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize)) + log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize)) } diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 1c91b7b8414f486a853868dad010857c845192e6..7e89229586d29697303b5d39b591de79005b9733 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -14,7 +14,6 @@ package grpcrootcoord import ( "context" "io" - "math" "net" "strconv" "sync" @@ -207,8 +206,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { opts := trace.GetInterceptorOpts() s.grpcServer = grpc.NewServer( - grpc.MaxRecvMsgSize(math.MaxInt32), - grpc.MaxSendMsgSize(math.MaxInt32), + grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize), + grpc.MaxSendMsgSize(Params.ServerMaxSendSize), grpc.UnaryInterceptor(grpc_opentracing.UnaryServerInterceptor(opts...)), grpc.StreamInterceptor(grpc_opentracing.StreamServerInterceptor(opts...))) rootcoordpb.RegisterRootCoordServer(s.grpcServer, s) diff --git a/internal/util/paramtable/basetable.go b/internal/util/paramtable/basetable.go index 7c55af6fabf055b2bf0c4cfcd6f8759666bc9072..61990d79928c2acf9d4a09591f2852f31094fb1e 100644 --- a/internal/util/paramtable/basetable.go +++ b/internal/util/paramtable/basetable.go @@ -316,6 +316,18 @@ func (gp *BaseTable) ParseInt(key string) int { return value } +func (gp *BaseTable) ParseIntWithErr(key string) (int, error) { + valueStr, err := gp.Load(key) + if err != nil { + return 0, err + } + value, err := strconv.Atoi(valueStr) + if err != nil { + return 0, err + } + return value, nil +} + // package methods func ConvertRangeToIntRange(rangeStr, sep string) []int { diff --git a/internal/util/paramtable/basetable_test.go b/internal/util/paramtable/basetable_test.go index 25f067a7836da0c808497556a44c399a6a9c0904..8e531695373cf8f7098a29fae566a756f23c1e4e 100644 --- a/internal/util/paramtable/basetable_test.go +++ b/internal/util/paramtable/basetable_test.go @@ -111,3 +111,20 @@ func TestGlobalParamsTable_LoadYaml(t *testing.T) { _, err = baseParams.Load("pulsar.port") assert.Nil(t, err) } + +func TestBaseTable_ParseIntWithErr(t *testing.T) { + var err error + + key1 := "ParseIntWithErrInt" + err = baseParams.Save(key1, "10") + assert.Nil(t, err) + ten, err := baseParams.ParseIntWithErr(key1) + assert.Nil(t, err) + assert.Equal(t, 10, ten) + + key2 := "ParseIntWithErrInvalidInt" + err = baseParams.Save(key2, "invalid") + assert.Nil(t, err) + _, err = baseParams.ParseIntWithErr(key2) + assert.NotNil(t, err) +}