未验证 提交 5f0f8407 编写于 作者: G godchen 提交者: GitHub

Remove datanode client code (#5427)

Remove datanode client code.

Signed-off-by: godchen qingxiang.chen@zilliz.com
上级 a3fb1356
......@@ -76,7 +76,7 @@ type Server struct {
insertChannels []string
msFactory msgstream.Factory
ttBarrier timesync.TimeTickBarrier
createDataNodeClient func(addr string, serverID int64) (types.DataNode, error)
createDataNodeClient func(addr string) (types.DataNode, error)
}
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
......@@ -87,8 +87,8 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
msFactory: factory,
}
s.insertChannels = s.getInsertChannels()
s.createDataNodeClient = func(addr string, serverID int64) (types.DataNode, error) {
node, err := grpcdatanodeclient.NewClient(addr, serverID, []string{Params.EtcdAddress}, 10)
s.createDataNodeClient = func(addr string) (types.DataNode, error) {
node, err := grpcdatanodeclient.NewClient(addr, 10)
if err != nil {
return nil, err
}
......@@ -570,7 +570,7 @@ func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeReque
}
func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) {
client, err := s.createDataNodeClient(fmt.Sprintf("%s:%d", ip, port), id)
client, err := s.createDataNodeClient(fmt.Sprintf("%s:%d", ip, port))
if err != nil {
return nil, err
}
......
......@@ -801,7 +801,7 @@ func newTestServer(t *testing.T) *Server {
assert.Nil(t, err)
defer ms.Stop()
svr.SetMasterClient(ms)
svr.createDataNodeClient = func(addr string, serverID int64) (types.DataNode, error) {
svr.createDataNodeClient = func(addr string) (types.DataNode, error) {
return newMockDataNodeClient(0)
}
assert.Nil(t, err)
......
......@@ -14,7 +14,6 @@ package grpcdatanodeclient
import (
"context"
"fmt"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/log"
......@@ -27,8 +26,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)
......@@ -39,37 +36,23 @@ type Client struct {
grpc datapb.DataNodeClient
conn *grpc.ClientConn
address string
serverID int64
address string
sess *sessionutil.Session
timeout time.Duration
reconnTry int
recallTry int
}
func getDataNodeAddress(sess *sessionutil.Session, serverID int64) (string, error) {
key := typeutil.DataNodeRole + "-" + strconv.FormatInt(serverID, 10)
msess, _, err := sess.GetSessions(key)
if err != nil {
return "", err
}
ms, ok := msess[key]
if !ok {
return "", fmt.Errorf("number of master service is incorrect, %d", len(msess))
func NewClient(address string, timeout time.Duration) (*Client, error) {
if address == "" {
return nil, fmt.Errorf("address is empty")
}
return ms.Address, nil
}
func NewClient(address string, serverID int64, etcdAddr []string, timeout time.Duration) (*Client, error) {
sess := sessionutil.NewSession(context.Background(), etcdAddr)
return &Client{
grpc: nil,
conn: nil,
address: address,
ctx: context.Background(),
sess: sess,
timeout: timeout,
recallTry: 3,
reconnTry: 10,
......@@ -78,27 +61,23 @@ func NewClient(address string, serverID int64, etcdAddr []string, timeout time.D
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
if c.address != "" {
connectGrpcFunc := func() error {
log.Debug("DataNode connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
c.conn = conn
return nil
}
err := retry.Retry(c.reconnTry, time.Millisecond*500, connectGrpcFunc)
connectGrpcFunc := func() error {
log.Debug("DataNode connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
}
} else {
return c.reconnect()
c.conn = conn
return nil
}
err := retry.Retry(c.reconnTry, time.Millisecond*500, connectGrpcFunc)
if err != nil {
return err
}
c.grpc = datapb.NewDataNodeClient(c.conn)
return nil
......@@ -107,17 +86,6 @@ func (c *Client) Init() error {
func (c *Client) reconnect() error {
tracer := opentracing.GlobalTracer()
var err error
getDataNodeAddressFn := func() error {
c.address, err = getDataNodeAddress(c.sess, c.serverID)
if err != nil {
return err
}
return nil
}
err = retry.Retry(c.reconnTry, 3*time.Second, getDataNodeAddressFn)
if err != nil {
return err
}
connectGrpcFunc := func() error {
log.Debug("DataNode connect ", zap.String("address", c.address))
conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册