service.go 2.4 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3
package datanode

import (
X
XuanYang-cn 已提交
4
	"context"
X
XuanYang-cn 已提交
5 6
	"net"
	"strconv"
X
XuanYang-cn 已提交
7 8
	"sync"

X
XuanYang-cn 已提交
9
	dn "github.com/zilliztech/milvus-distributed/internal/datanode"
10
	"github.com/zilliztech/milvus-distributed/internal/errors"
X
XuanYang-cn 已提交
11 12 13 14
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"

X
XuanYang-cn 已提交
15 16 17 18
	"google.golang.org/grpc"
)

type Server struct {
X
XuanYang-cn 已提交
19
	core *dn.DataNode
X
XuanYang-cn 已提交
20 21 22 23 24 25 26 27 28

	grpcServer *grpc.Server
	grpcError  error
	grpcErrMux sync.Mutex

	ctx    context.Context
	cancel context.CancelFunc
}

X
XuanYang-cn 已提交
29
func New(ctx context.Context) (*Server, error) {
30 31 32
	var s = &Server{
		ctx: ctx,
	}
X
XuanYang-cn 已提交
33

X
XuanYang-cn 已提交
34
	s.core = dn.NewDataNode(s.ctx)
X
XuanYang-cn 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
	s.grpcServer = grpc.NewServer()
	datapb.RegisterDataNodeServer(s.grpcServer, s)
	addr := dn.Params.IP + ":" + strconv.FormatInt(dn.Params.Port, 10)
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		return nil, err
	}

	go func() {
		if err = s.grpcServer.Serve(lis); err != nil {
			s.grpcErrMux.Lock()
			defer s.grpcErrMux.Unlock()
			s.grpcError = err
		}
	}()

	s.grpcErrMux.Lock()
	err = s.grpcError
	s.grpcErrMux.Unlock()

	if err != nil {
		return nil, err
	}
	return s, nil
X
XuanYang-cn 已提交
59 60
}

X
XuanYang-cn 已提交
61 62 63 64 65 66 67 68
func (s *Server) SetMasterServiceInterface(ms dn.MasterServiceInterface) error {
	return s.core.SetMasterServiceInterface(ms)
}

func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error {
	return s.core.SetDataServiceInterface(ds)
}

X
XuanYang-cn 已提交
69 70 71 72 73 74 75 76 77
func (s *Server) Init() error {
	return s.core.Init()
}

func (s *Server) Start() error {
	return s.core.Start()
}

func (s *Server) Stop() error {
78
	err := s.core.Stop()
S
sunby 已提交
79
	s.cancel()
80 81
	s.grpcServer.GracefulStop()
	return err
X
XuanYang-cn 已提交
82 83 84
}

func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
X
XuanYang-cn 已提交
85
	return s.core.GetComponentStates()
X
XuanYang-cn 已提交
86 87
}

X
XuanYang-cn 已提交
88
func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
89
	return s.core.WatchDmChannels(in)
X
XuanYang-cn 已提交
90 91
}

X
XuanYang-cn 已提交
92
func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
93 94 95 96 97 98
	if s.core.State != internalpb2.StateCode_HEALTHY {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    "DataNode isn't healthy.",
		}, errors.Errorf("DataNode is not ready yet")
	}
X
XuanYang-cn 已提交
99 100 101
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, s.core.FlushSegments(in)
X
XuanYang-cn 已提交
102
}