未验证 提交 ad77a6e0 编写于 作者: D dragondriver 提交者: GitHub

Fix #6859, increase the MaxCallRecvMsgSize and MaxCallSendMsgSize of grpc client (#6861)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 0f76c23a
......@@ -41,31 +41,79 @@ rootCoord:
address: localhost
port: 53100
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
proxy:
port: 19530
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
queryCoord:
address: localhost
port: 19531
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
queryNode:
gracefulTime: 1000 # ms, for search
port: 21123
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
indexCoord:
address: localhost
port: 31000
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
indexNode:
port: 21121
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
dataCoord:
address: localhost
port: 13333
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
dataNode:
port: 21124
grpc:
serverMaxRecvSize: 2147483647 # math.MaxInt32
serverMaxSendSize: 2147483647 # math.MaxInt32
clientMaxRecvSize: 104857600 # 100 MB, 100 * 1024 * 1024
clientMaxSendSize: 104857600 # 100 MB, 100 * 1024 * 1024
storage:
path: /var/lib/milvus/data/
......
......@@ -77,6 +77,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -92,6 +93,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcdatacoordclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("dataCoord.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("dataCoord.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("dataCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("dataCoord.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("dataCoord.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("dataCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcdatacoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcdatacoordclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -23,6 +27,9 @@ type ParamTable struct {
IP string
Port int
RootCoordAddress string
ServerMaxSendSize int
ServerMaxRecvSize int
}
var Params ParamTable
......@@ -34,6 +41,9 @@ func (pt *ParamTable) Init() {
pt.initPort()
pt.initParams()
pt.LoadFromEnv()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -65,3 +75,25 @@ func (pt *ParamTable) initDataCoordAddress() {
}
pt.IP = ret
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("dataCoord.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("dataCoord.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("dataCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("dataCoord.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("dataCoord.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("dataCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
......@@ -14,6 +14,9 @@ package grpcdatacoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
......@@ -25,4 +28,7 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -14,7 +14,6 @@ package grpcdatacoordclient
import (
"context"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -124,8 +123,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
......
......@@ -61,6 +61,7 @@ func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -70,6 +71,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("DataNode connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithDisableRetry(),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcdatanodeclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("dataNode.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("dataNode.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("dataNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("dataNode.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("dataNode.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("dataNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcdatanodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -15,6 +15,8 @@ import (
"net"
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
......@@ -33,6 +35,9 @@ type ParamTable struct {
RootCoordAddress string
DataCoordAddress string
ServerMaxSendSize int
ServerMaxRecvSize int
}
func (pt *ParamTable) Init() {
......@@ -41,6 +46,9 @@ func (pt *ParamTable) Init() {
pt.initRootCoordAddress()
pt.initDataCoordAddress()
pt.initPort()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -79,3 +87,25 @@ func (pt *ParamTable) initDataCoordAddress() {
}
pt.DataCoordAddress = ret
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("dataNode.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("dataNode.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("dataNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("dataNode.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("dataNode.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("dataNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
......@@ -14,6 +14,9 @@ package grpcdatanode
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
......@@ -35,4 +38,7 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -16,7 +16,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -94,8 +93,8 @@ func (s *Server) startGrpcLoop(listener net.Listener) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
......
package grpcconfigs
import "math"
const (
DefaultServerMaxSendSize = math.MaxInt32
DefaultServerMaxRecvSize = math.MaxInt32
DefaultClientMaxSendSize = 100 * 1024 * 1024
DefaultClientMaxRecvSize = 100 * 1024 * 1024
)
......@@ -78,6 +78,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -93,6 +94,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("IndexCoordClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(3), grpc_retry.WithPerRetryTimeout(time.Second*3)),
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexcoordclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("indexCoord.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("indexCoord.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("indexCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("indexCoord.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("indexCoord.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("indexCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexcoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcindexcoord
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -22,6 +26,9 @@ type ParamTable struct {
ServiceAddress string
ServicePort int
ServerMaxSendSize int
ServerMaxRecvSize int
}
var Params ParamTable
......@@ -37,6 +44,9 @@ func (pt *ParamTable) Init() {
func (pt *ParamTable) initParams() {
pt.initServicePort()
pt.initServiceAddress()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
}
func (pt *ParamTable) initServicePort() {
......@@ -50,3 +60,25 @@ func (pt *ParamTable) initServiceAddress() {
}
pt.ServiceAddress = ret
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("indexCoord.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("indexCoord.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("indexCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("indexCoord.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("indexCoord.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("indexCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -14,7 +14,6 @@ package grpcindexcoord
import (
"context"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -161,8 +160,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(ot.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
......
......@@ -57,6 +57,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -66,6 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("IndexNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexnodeclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("indexNode.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("indexNode.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("indexNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("indexNode.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("indexNode.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("indexNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexnodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcindexnode
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -26,6 +30,9 @@ type ParamTable struct {
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
var Params ParamTable
......@@ -35,6 +42,9 @@ func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -64,3 +74,25 @@ func (pt *ParamTable) initPort() {
port := pt.ParseInt("indexNode.port")
pt.Port = port
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("indexNode.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("indexNode.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("indexNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("indexNode.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("indexNode.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("indexNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexnode
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -15,7 +15,6 @@ import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -79,8 +78,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(grpc_opentracing.StreamServerInterceptor(opts...)))
indexpb.RegisterIndexNodeServer(s.grpcServer, s)
......
......@@ -56,6 +56,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -65,6 +66,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("ProxyClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcproxyclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("proxy.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("proxy.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("proxy.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("proxy.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("proxy.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("proxy.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcproxyclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcproxy
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -29,6 +33,9 @@ type ParamTable struct {
IP string
Port int
Address string
ServerMaxSendSize int
ServerMaxRecvSize int
}
var Params ParamTable
......@@ -38,6 +45,9 @@ func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initParams()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -97,3 +107,25 @@ func (pt *ParamTable) initPort() {
port := pt.ParseInt("proxy.port")
pt.Port = port
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("proxy.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("proxy.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("proxy.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("proxy.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("proxy.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("proxy.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcproxy
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -15,7 +15,6 @@ import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -95,8 +94,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.MaxRecvMsgSize(GRPCMaxMagSize),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
......
......@@ -78,6 +78,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*C
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -93,6 +94,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("QueryCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcquerycoordclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("queryCoord.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("queryCoord.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("queryCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("queryCoord.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("queryCoord.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("queryCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcquerycoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcquerycoord
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -24,9 +28,11 @@ type ParamTable struct {
paramtable.BaseTable
Port int
IndexCoordAddress string
RootCoordAddress string
DataCoordAddress string
RootCoordAddress string
DataCoordAddress string
ServerMaxSendSize int
ServerMaxRecvSize int
}
func (pt *ParamTable) Init() {
......@@ -34,8 +40,10 @@ func (pt *ParamTable) Init() {
pt.BaseTable.Init()
pt.initPort()
pt.initRootCoordAddress()
pt.initIndexCoordAddress()
pt.initDataCoordAddress()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -47,14 +55,6 @@ func (pt *ParamTable) initRootCoordAddress() {
pt.RootCoordAddress = ret
}
func (pt *ParamTable) initIndexCoordAddress() {
ret, err := pt.Load("IndexCoordAddress")
if err != nil {
panic(err)
}
pt.IndexCoordAddress = ret
}
func (pt *ParamTable) initDataCoordAddress() {
ret, err := pt.Load("_DataCoordAddress")
if err != nil {
......@@ -66,3 +66,25 @@ func (pt *ParamTable) initDataCoordAddress() {
func (pt *ParamTable) initPort() {
pt.Port = pt.ParseInt("queryCoord.port")
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("queryCoord.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("queryCoord.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("queryCoord.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("queryCoord.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("queryCoord.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("queryCoord.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
......@@ -11,23 +11,24 @@
package grpcquerycoord
/*
import (
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
)
func TestParamTable(t *testing.T) {
Params.Init()
assert.NotEqual(t, Params.IndexCoordAddress, "")
t.Logf("IndexCoordAddress:%s", Params.IndexCoordAddress)
assert.NotEqual(t, Params.DataCoordAddress, "")
t.Logf("DataCoordAddress:%s", Params.DataCoordAddress)
assert.NotEqual(t, Params.RootCoordAddress, "")
t.Logf("RootCoordAddress:%s", Params.RootCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
*/
......@@ -14,7 +14,6 @@ package grpcquerycoord
import (
"context"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -187,8 +186,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
......
......@@ -57,6 +57,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
}
func (c *Client) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -66,6 +67,9 @@ func (c *Client) connect(retryOptions ...retry.Option) error {
log.Debug("QueryNodeClient try connect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcquerynodeclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("queryNode.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("queryNode.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("queryNode.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("queryNode.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("queryNode.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("queryNode.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcquerynodeclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcquerynode
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -32,6 +36,9 @@ type ParamTable struct {
IndexCoordAddress string
DataCoordAddress string
QueryCoordAddress string
ServerMaxSendSize int
ServerMaxRecvSize int
}
func (pt *ParamTable) Init() {
......@@ -42,6 +49,9 @@ func (pt *ParamTable) Init() {
pt.initIndexCoordAddress()
pt.initDataCoordAddress()
pt.initQueryCoordAddress()
pt.initServerMaxSendSize()
pt.initServerMaxRecvSize()
})
}
......@@ -89,3 +99,25 @@ func (pt *ParamTable) initPort() {
port := pt.ParseInt("queryNode.port")
pt.QueryNodePort = port
}
func (pt *ParamTable) initServerMaxSendSize() {
var err error
pt.ServerMaxSendSize, err = pt.ParseIntWithErr("queryNode.grpc.serverMaxSendSize")
if err != nil {
pt.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("queryNode.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("queryNode.grpc.serverMaxSendSize", pt.ServerMaxSendSize))
}
func (pt *ParamTable) initServerMaxRecvSize() {
var err error
pt.ServerMaxRecvSize, err = pt.ParseIntWithErr("queryNode.grpc.serverMaxRecvSize")
if err != nil {
pt.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("queryNode.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("queryNode.grpc.serverMaxRecvSize", pt.ServerMaxRecvSize))
}
......@@ -14,6 +14,9 @@ package grpcquerynode
import (
"testing"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/stretchr/testify/assert"
)
......@@ -31,4 +34,7 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.QueryCoordAddress, "")
t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -15,7 +15,6 @@ import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -200,8 +199,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(
grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(
......
......@@ -86,6 +86,7 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*G
}
func (c *GrpcClient) Init() error {
Params.Init()
return c.connect(retry.Attempts(20))
}
......@@ -101,6 +102,9 @@ func (c *GrpcClient) connect(retryOptions ...retry.Option) error {
log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr))
conn, err := grpc.DialContext(c.ctx, c.addr,
grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
grpc.WithUnaryInterceptor(
grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcrootcoordclient
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
ClientMaxSendSize int
ClientMaxRecvSize int
}
var Params ParamTable
var once sync.Once
func (pt *ParamTable) Init() {
once.Do(func() {
pt.BaseTable.Init()
pt.initClientMaxSendSize()
pt.initClientMaxRecvSize()
})
}
func (pt *ParamTable) initClientMaxSendSize() {
var err error
pt.ClientMaxSendSize, err = pt.ParseIntWithErr("rootCoord.grpc.clientMaxSendSize")
if err != nil {
pt.ClientMaxSendSize = grpcconfigs.DefaultClientMaxSendSize
log.Debug("rootCoord.grpc.clientMaxSendSize not set, set to default")
}
log.Debug("initClientMaxSendSize",
zap.Int("rootCoord.grpc.clientMaxSendSize", pt.ClientMaxSendSize))
}
func (pt *ParamTable) initClientMaxRecvSize() {
var err error
pt.ClientMaxRecvSize, err = pt.ParseIntWithErr("rootCoord.grpc.clientMaxRecvSize")
if err != nil {
pt.ClientMaxRecvSize = grpcconfigs.DefaultClientMaxRecvSize
log.Debug("rootCoord.grpc.clientMaxRecvSize not set, set to default")
}
log.Debug("initClientMaxRecvSize",
zap.Int("rootCoord.grpc.clientMaxRecvSize", pt.ClientMaxRecvSize))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcrootcoordclient
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestParamTable(t *testing.T) {
Params.Init()
log.Info("TestParamTable", zap.Int("ClientMaxSendSize", Params.ClientMaxSendSize))
log.Info("TestParamTable", zap.Int("ClientMaxRecvSize", Params.ClientMaxRecvSize))
}
......@@ -14,6 +14,10 @@ package grpcrootcoord
import (
"sync"
"github.com/milvus-io/milvus/internal/distributed/grpcconfigs"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -29,6 +33,9 @@ type ParamTable struct {
IndexCoordAddress string
QueryCoordAddress string
DataCoordAddress string
ServerMaxSendSize int
ServerMaxRecvSize int
}
func (p *ParamTable) Init() {
......@@ -43,6 +50,9 @@ func (p *ParamTable) Init() {
p.initIndexCoordAddress()
p.initQueryCoordAddress()
p.initDataCoordAddress()
p.initServerMaxSendSize()
p.initServerMaxRecvSize()
})
}
......@@ -81,3 +91,25 @@ func (p *ParamTable) initDataCoordAddress() {
}
p.DataCoordAddress = ret
}
func (p *ParamTable) initServerMaxSendSize() {
var err error
p.ServerMaxSendSize, err = p.ParseIntWithErr("rootCoord.grpc.serverMaxSendSize")
if err != nil {
p.ServerMaxSendSize = grpcconfigs.DefaultServerMaxSendSize
log.Debug("rootCoord.grpc.serverMaxSendSize not set, set to default")
}
log.Debug("initServerMaxSendSize",
zap.Int("rootCoord.grpc.serverMaxSendSize", p.ServerMaxSendSize))
}
func (p *ParamTable) initServerMaxRecvSize() {
var err error
p.ServerMaxRecvSize, err = p.ParseIntWithErr("rootCoord.grpc.serverMaxRecvSize")
if err != nil {
p.ServerMaxRecvSize = grpcconfigs.DefaultServerMaxRecvSize
log.Debug("rootCoord.grpc.serverMaxRecvSize not set, set to default")
}
log.Debug("initServerMaxRecvSize",
zap.Int("rootCoord.grpc.serverMaxRecvSize", p.ServerMaxRecvSize))
}
......@@ -14,6 +14,9 @@ package grpcrootcoord
import (
"testing"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
)
......@@ -34,4 +37,7 @@ func TestParamTable(t *testing.T) {
assert.NotEqual(t, Params.QueryCoordAddress, "")
t.Logf("QueryCoordAddress:%s", Params.QueryCoordAddress)
log.Info("TestParamTable", zap.Int("ServerMaxSendSize", Params.ServerMaxSendSize))
log.Info("TestParamTable", zap.Int("ServerMaxRecvSize", Params.ServerMaxRecvSize))
}
......@@ -14,7 +14,6 @@ package grpcrootcoord
import (
"context"
"io"
"math"
"net"
"strconv"
"sync"
......@@ -207,8 +206,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
opts := trace.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(grpc_opentracing.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(grpc_opentracing.StreamServerInterceptor(opts...)))
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)
......
......@@ -316,6 +316,18 @@ func (gp *BaseTable) ParseInt(key string) int {
return value
}
func (gp *BaseTable) ParseIntWithErr(key string) (int, error) {
valueStr, err := gp.Load(key)
if err != nil {
return 0, err
}
value, err := strconv.Atoi(valueStr)
if err != nil {
return 0, err
}
return value, nil
}
// package methods
func ConvertRangeToIntRange(rangeStr, sep string) []int {
......
......@@ -111,3 +111,20 @@ func TestGlobalParamsTable_LoadYaml(t *testing.T) {
_, err = baseParams.Load("pulsar.port")
assert.Nil(t, err)
}
func TestBaseTable_ParseIntWithErr(t *testing.T) {
var err error
key1 := "ParseIntWithErrInt"
err = baseParams.Save(key1, "10")
assert.Nil(t, err)
ten, err := baseParams.ParseIntWithErr(key1)
assert.Nil(t, err)
assert.Equal(t, 10, ten)
key2 := "ParseIntWithErrInvalidInt"
err = baseParams.Save(key2, "invalid")
assert.Nil(t, err)
_, err = baseParams.ParseIntWithErr(key2)
assert.NotNil(t, err)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册