From 4f914a2c30f8439696579695a61fd151882390c0 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 27 Jan 2021 09:50:52 +0800 Subject: [PATCH] Add docker file and main function of query node Signed-off-by: bigsheeper --- Makefile | 8 +- build/docker/deploy/docker-compose.yml | 12 +-- build/docker/deploy/queryservice/DockerFile | 19 ++++- cmd/querynode/{query_node.go => querynode.go} | 19 +++-- cmd/queryservice/queryservice.go | 83 +++++-------------- internal/distributed/querynode/service.go | 41 +++++++-- internal/distributed/queryservice/service.go | 8 +- internal/querynode/query_node.go | 78 +++++++++++++++-- internal/querynode/type_def.go | 5 ++ internal/queryservice/queryservice.go | 24 +++++- internal/queryservice/queryservice_test.go | 4 - internal/util/typeutil/interface.go | 10 --- 12 files changed, 196 insertions(+), 115 deletions(-) rename cmd/querynode/{query_node.go => querynode.go} (66%) diff --git a/Makefile b/Makefile index 45c2564b2..b525ecbed 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ proxynode: build-cpp querynode: build-cpp @echo "Building each component's binary to './bin'" @echo "Building query node ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null # Builds various components locally. @@ -134,8 +134,10 @@ build-go: build-cpp @echo "Building proxy node ..." # TODO: fix me, why proxy node need cgo enabled @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null + @echo "Building query service ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null @echo "Building query node ..." - @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null @echo "Building write node ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/writenode $(PWD)/cmd/writenode/writenode.go 1>/dev/null @echo "Building binlog ..." @@ -186,6 +188,7 @@ docker: verifiers # Builds each component and installs it to $GOPATH/bin. install: all @echo "Installing binary to './bin'" + @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/queryservice $(GOPATH)/bin/queryservice @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/querynode $(GOPATH)/bin/querynode @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master @mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxynode $(GOPATH)/bin/proxynode @@ -206,6 +209,7 @@ clean: @rm -rf $(GOPATH)/bin/master @rm -rf $(GOPATH)/bin/proxynode @rm -rf $(GOPATH)/bin/proxyservice + @rm -rf $(GOPATH)/bin/queryservice @rm -rf $(GOPATH)/bin/querynode @rm -rf $(GOPATH)/bin/writenode @rm -rf $(GOPATH)/bin/singlenode diff --git a/build/docker/deploy/docker-compose.yml b/build/docker/deploy/docker-compose.yml index 940af657d..bf855252c 100644 --- a/build/docker/deploy/docker-compose.yml +++ b/build/docker/deploy/docker-compose.yml @@ -40,18 +40,18 @@ services: networks: - milvus - indexbuilder: - image: ${TARGET_REPO}/indexbuilder:${TARGET_TAG} + queryservice: + image: ${TARGET_REPO}/queryservice:${TARGET_TAG} build: context: ../../../ - dockerfile: build/docker/deploy/indexbuilder/DockerFile + dockerfile: build/docker/deploy/queryservice/DockerFile cache_from: - - ${SOURCE_REPO}/indexbuilder:${SOURCE_TAG} + - ${SOURCE_REPO}/queryservice:${SOURCE_TAG} environment: - MASTER_ADDRESS: ${MASTER_ADDRESS} + PULSAR_ADDRESS: ${PULSAR_ADDRESS} ETCD_ADDRESS: ${ETCD_ADDRESS} + MASTER_ADDRESS: ${MASTER_ADDRESS} MINIO_ADDRESS: ${MINIO_ADDRESS} - networks: - milvus diff --git a/build/docker/deploy/queryservice/DockerFile b/build/docker/deploy/queryservice/DockerFile index e871383ee..825be7402 100644 --- a/build/docker/deploy/queryservice/DockerFile +++ b/build/docker/deploy/queryservice/DockerFile @@ -9,8 +9,23 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing permissions and limitations under the License. +FROM milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-latest AS openblas + +#FROM alpine FROM ubuntu:bionic-20200921 +RUN apt-get update && apt-get install -y --no-install-recommends libtbb-dev gfortran + +#RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories + +#RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories \ +# && apk add --no-cache libtbb gfortran + +COPY --from=openblas /usr/lib/libopenblas-r0.3.9.so /usr/lib/ + +RUN ln -s /usr/lib/libopenblas-r0.3.9.so /usr/lib/libopenblas.so.0 && \ + ln -s /usr/lib/libopenblas.so.0 /usr/lib/libopenblas.so + COPY ./bin/queryservice /milvus-distributed/bin/queryservice COPY ./configs/ /milvus-distributed/configs/ @@ -21,6 +36,4 @@ ENV LD_LIBRARY_PATH=/milvus-distributed/lib:$LD_LIBRARY_PATH:/usr/lib WORKDIR /milvus-distributed/ -CMD ["./bin/masterservice"] - -EXPOSE 19531 +CMD ["./bin/queryservice"] diff --git a/cmd/querynode/query_node.go b/cmd/querynode/querynode.go similarity index 66% rename from cmd/querynode/query_node.go rename to cmd/querynode/querynode.go index 12a6639a5..060abd695 100644 --- a/cmd/querynode/query_node.go +++ b/cmd/querynode/querynode.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "log" "os" "os/signal" @@ -10,16 +9,16 @@ import ( "go.uber.org/zap" - "github.com/zilliztech/milvus-distributed/internal/querynode" + grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" ) func main() { - - querynode.Init() - fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID) // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := querynode.NewQueryNode(ctx, 0) + svr := grpcquerynode.NewServer(ctx) + if err := svr.Init(); err != nil { + panic(err) + } sc := make(chan os.Signal, 1) signal.Notify(sc, @@ -34,12 +33,16 @@ func main() { cancel() }() - svr.Start() + if err := svr.Start(); err != nil { + panic(err) + } <-ctx.Done() log.Print("Got signal to exit", zap.String("signal", sig.String())) - svr.Stop() + if err := svr.Stop(); err != nil { + panic(err) + } switch sig { case syscall.SIGTERM: exit(0) diff --git a/cmd/queryservice/queryservice.go b/cmd/queryservice/queryservice.go index 976332173..7ec8a9ce2 100644 --- a/cmd/queryservice/queryservice.go +++ b/cmd/queryservice/queryservice.go @@ -2,83 +2,38 @@ package main import ( "context" - "fmt" "log" + "os" + "os/signal" + "syscall" - ds "github.com/zilliztech/milvus-distributed/internal/dataservice" - dds "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" "github.com/zilliztech/milvus-distributed/internal/queryservice" ) -const reTryCnt = 3 - func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err := queryservice.NewQueryService(ctx) - if err != nil { + svr := grpcqueryservice.NewServer(ctx) + + if err := svr.Init(); err != nil { panic(err) } - log.Printf("query service address : %s:%d", queryservice.Params.Address, queryservice.Params.Port) - cnt := 0 - // init data service client - ds.Params.Init() - log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port) - dataClient := dds.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) - if err = dataClient.Init(); err != nil { + log.Printf("query service address : %s", queryservice.Params.Address) + + if err := svr.Start(); err != nil { panic(err) } - for cnt = 0; cnt < reTryCnt; cnt++ { - dsStates, err := dataClient.GetComponentStates() - if err != nil { - log.Printf("retry cout = %d, error = %s", cnt, err.Error()) - continue - } - if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason) - continue - } - if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { - continue - } - break - } - if cnt >= reTryCnt { - panic("connect to data service failed") - } - //// init index service client - //is.Params.Init() - //log.Printf("index service address : %s:%d", is.Params.Address, is.Params.Port) - //indexClient := dis.NewClient(fmt.Sprintf("%s:%d", is.Params.Address, is.Params.Port)) - //// TODO: retry to check index service status - // - //if err = svr(dataService); err != nil { - // panic(err) - //} - // - //log.Printf("index service address : %s", is.Params.Address) - //indexService := isc.NewClient(is.Params.Address) - // - //if err = svr.SetIndexService(indexService); err != nil { - // panic(err) - //} - // - //if err = svr.Start(); err != nil { - // panic(err) - //} - // - //sc := make(chan os.Signal, 1) - //signal.Notify(sc, - // syscall.SIGHUP, - // syscall.SIGINT, - // syscall.SIGTERM, - // syscall.SIGQUIT) - //sig := <-sc - //log.Printf("Got %s signal to exit", sig.String()) - //_ = svr.Stop() + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + sig := <-sc + log.Printf("Got %s signal to exit", sig.String()) + _ = svr.Stop() } diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index dc56f0ba4..01bcdd2fa 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -2,14 +2,22 @@ package grpcquerynode import ( "context" + "fmt" + "log" "net" "google.golang.org/grpc" + "github.com/zilliztech/milvus-distributed/internal/dataservice" + grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + "github.com/zilliztech/milvus-distributed/internal/indexservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/querynode" + "github.com/zilliztech/milvus-distributed/internal/queryservice" ) type Server struct { @@ -17,28 +25,49 @@ type Server struct { node *querynode.QueryNode } -func NewServer(ctx context.Context, queryNodeID uint64) *Server { - return &Server{ - node: querynode.NewQueryNode(ctx, queryNodeID), +func NewServer(ctx context.Context) *Server { + server := &Server{ + node: querynode.NewQueryNodeWithoutID(ctx), } + + queryservice.Params.Init() + queryClient := grpcqueryserviceclient.NewClient(queryservice.Params.Address) + if err := server.node.SetQueryService(queryClient); err != nil { + panic(err) + } + + indexservice.Params.Init() + indexClient := grpcindexserviceclient.NewClient(indexservice.Params.Address) + if err := server.node.SetIndexService(indexClient); err != nil { + panic(err) + } + + dataservice.Params.Init() + log.Println("connect to data service, address =", fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) + dataClient := grpcdataserviceclient.NewClient(fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) + if err := server.node.SetDataService(dataClient); err != nil { + panic(err) + } + + return server } func (s *Server) StartGrpcServer() { - // TODO: add address - lis, err := net.Listen("tcp", "") + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", querynode.Params.QueryNodePort)) if err != nil { panic(err) } s.grpcServer = grpc.NewServer() querypb.RegisterQueryNodeServer(s.grpcServer, s) + fmt.Println("start query node grpc server...") if err = s.grpcServer.Serve(lis); err != nil { panic(err) } } func (s *Server) Init() error { - return s.Init() + return s.node.Init() } func (s *Server) Start() error { diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 69fe84766..72a2f801d 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - masterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -33,7 +33,9 @@ type Server struct { func (s *Server) Init() error { log.Println("query service init") - s.queryService.Init() + if err := s.queryService.Init(); err != nil { + panic(err) + } s.queryService.SetEnableGrpc(true) return nil } @@ -153,7 +155,7 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (* return s.queryService.CreateQueryChannel() } -func (s *Server) NewServer(ctx context.Context) *Server { +func NewServer(ctx context.Context) *Server { ctx1, cancel := context.WithCancel(ctx) service, err := queryservice.NewQueryService(ctx) if err != nil { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 102d6f8ab..43b9953f2 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -32,7 +32,16 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -type Node = typeutil.QueryNodeInterface +type Node interface { + typeutil.Component + + AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) + RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) + WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) + LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) + ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) +} + type QueryService = typeutil.QueryServiceInterface type QueryNode struct { @@ -60,6 +69,7 @@ type QueryNode struct { closer io.Closer // clients + queryClient QueryServiceInterface indexClient IndexServiceInterface dataClient DataServiceInterface } @@ -107,33 +117,77 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { return node } +func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { + ctx1, cancel := context.WithCancel(ctx) + node := &QueryNode{ + queryNodeLoopCtx: ctx1, + queryNodeLoopCancel: cancel, + + dataSyncService: nil, + metaService: nil, + searchService: nil, + statsService: nil, + segManager: nil, + } + + var err error + cfg := &config.Configuration{ + ServiceName: "query_node", + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + } + node.tracer, node.closer, err = cfg.NewTracer() + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + opentracing.SetGlobalTracer(node.tracer) + + segmentsMap := make(map[int64]*Segment) + collections := make([]*Collection, 0) + + tSafe := newTSafe() + + node.replica = &collectionReplicaImpl{ + collections: collections, + segments: segmentsMap, + + tSafe: tSafe, + } + node.stateCode.Store(internalpb2.StateCode_INITIALIZING) + return node +} + // TODO: delete this and call node.Init() func Init() { Params.Init() } func (node *QueryNode) Init() error { + Params.Init() + return nil +} + +func (node *QueryNode) Start() error { registerReq := &queryPb.RegisterNodeRequest{ Address: &commonpb.Address{ Ip: Params.QueryNodeIP, Port: Params.QueryNodePort, }, } - var client QueryService // TODO: init interface - response, err := client.RegisterNode(registerReq) + + response, err := node.queryClient.RegisterNode(registerReq) if err != nil { panic(err) } if response.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { panic(response.Status.Reason) } - // TODO: use response.initParams - Params.Init() - return nil -} + Params.QueryNodeID = response.InitParams.NodeID + fmt.Println("QueryNodeID is", Params.QueryNodeID) -func (node *QueryNode) Start() error { if node.indexClient == nil { log.Println("WARN: null index service detected") } @@ -189,6 +243,14 @@ func (node *QueryNode) Stop() error { return nil } +func (node *QueryNode) SetQueryService(query QueryServiceInterface) error { + if query == nil { + return errors.New("query index service interface") + } + node.queryClient = query + return nil +} + func (node *QueryNode) SetIndexService(index IndexServiceInterface) error { if index == nil { return errors.New("null index service interface") diff --git a/internal/querynode/type_def.go b/internal/querynode/type_def.go index 768cf6919..d6e6b0d19 100644 --- a/internal/querynode/type_def.go +++ b/internal/querynode/type_def.go @@ -3,6 +3,7 @@ package querynode import ( "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -16,6 +17,10 @@ type TimeRange struct { timestampMax Timestamp } +type QueryServiceInterface interface { + RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) +} + type DataServiceInterface interface { GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index a57a18662..25cb8a386 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -2,6 +2,7 @@ package queryservice import ( "context" + "fmt" "log" "sort" "strconv" @@ -76,6 +77,26 @@ func (qs *QueryService) Stop() error { return nil } +//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error { +// for _, v := range qs.queryNodeClient { +// err := v.SetDataService(d) +// if err != nil { +// return err +// } +// } +// return nil +//} +// +//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error { +// for _, v := range qs.queryNodeClient { +// err := v.SetIndexService(i) +// if err != nil { +// return err +// } +// } +// return nil +//} + func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, @@ -112,6 +133,7 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { // TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { + fmt.Println("register query node =", req.Address) allocatedID := qs.numRegisterNode qs.numRegisterNode++ @@ -134,7 +156,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb nodeID: allocatedID, } } - qs.queryNodes[allocatedID] = node + qs.queryNodes = append(qs.queryNodes, node) return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index 067d308ac..49038ff7a 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -34,7 +34,3 @@ func TestQueryService_Init(t *testing.T) { service.Stop() } - -//func TestQueryService_Load(t *testing.T) { -// -//} diff --git a/internal/util/typeutil/interface.go b/internal/util/typeutil/interface.go index 6691ff33d..b473f0890 100644 --- a/internal/util/typeutil/interface.go +++ b/internal/util/typeutil/interface.go @@ -49,13 +49,3 @@ type QueryServiceInterface interface { CreateQueryChannel() (*querypb.CreateQueryChannelResponse, error) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) } - -type QueryNodeInterface interface { - Component - - AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) - RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) - WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) - LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) - ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) -} -- GitLab