service.go 2.0 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3
package datanode

import (
X
XuanYang-cn 已提交
4
	"context"
X
XuanYang-cn 已提交
5 6
	"net"
	"strconv"
X
XuanYang-cn 已提交
7 8
	"sync"

X
XuanYang-cn 已提交
9
	dn "github.com/zilliztech/milvus-distributed/internal/datanode"
X
XuanYang-cn 已提交
10 11 12 13
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"

X
XuanYang-cn 已提交
14 15 16 17
	"google.golang.org/grpc"
)

type Server struct {
X
XuanYang-cn 已提交
18
	core *dn.DataNode
X
XuanYang-cn 已提交
19 20 21 22 23 24 25 26 27

	grpcServer *grpc.Server
	grpcError  error
	grpcErrMux sync.Mutex

	ctx    context.Context
	cancel context.CancelFunc
}

X
XuanYang-cn 已提交
28
func New(ctx context.Context) (*Server, error) {
29 30 31
	var s = &Server{
		ctx: ctx,
	}
X
XuanYang-cn 已提交
32

X
XuanYang-cn 已提交
33
	s.core = dn.NewDataNode(s.ctx)
X
XuanYang-cn 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
	s.grpcServer = grpc.NewServer()
	datapb.RegisterDataNodeServer(s.grpcServer, s)
	addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		return nil, err
	}

	go func() {
		if err = s.grpcServer.Serve(lis); err != nil {
			s.grpcErrMux.Lock()
			defer s.grpcErrMux.Unlock()
			s.grpcError = err
		}
	}()

	s.grpcErrMux.Lock()
	err = s.grpcError
	s.grpcErrMux.Unlock()

	if err != nil {
		return nil, err
	}
	return s, nil
X
XuanYang-cn 已提交
58 59
}

X
XuanYang-cn 已提交
60 61 62 63 64 65 66 67
func (s *Server) SetMasterServiceInterface(ms dn.MasterServiceInterface) error {
	return s.core.SetMasterServiceInterface(ms)
}

func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
	return s.core.SetDataServiceInterface(ds)
}

X
XuanYang-cn 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
func (s *Server) Init() error {
	return s.core.Init()
}

func (s *Server) Start() error {
	return s.core.Start()
}

func (s *Server) Stop() error {
	return s.core.Stop()
}

func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
X
XuanYang-cn 已提交
81
	return s.core.GetComponentStates()
X
XuanYang-cn 已提交
82 83
}

X
XuanYang-cn 已提交
84
func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
85
	return s.core.WatchDmChannels(in)
X
XuanYang-cn 已提交
86 87
}

X
XuanYang-cn 已提交
88 89 90 91
func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, s.core.FlushSegments(in)
X
XuanYang-cn 已提交
92
}