提交 4f914a2c 编写于 作者: B bigsheeper 提交者: yefu.chen

Add docker file and main function of query node

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 4d37a329
......@@ -94,7 +94,7 @@ proxynode: build-cpp
querynode: build-cpp
@echo "Building each component's binary to './bin'"
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null
# Builds various components locally.
......@@ -134,8 +134,10 @@ build-go: build-cpp
@echo "Building proxy node ..."
# TODO: fix me, why proxy node need cgo enabled
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null
@echo "Building query service ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null
@echo "Building write node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/writenode $(PWD)/cmd/writenode/writenode.go 1>/dev/null
@echo "Building binlog ..."
......@@ -186,6 +188,7 @@ docker: verifiers
# Builds each component and installs it to $GOPATH/bin.
install: all
@echo "Installing binary to './bin'"
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/queryservice $(GOPATH)/bin/queryservice
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/querynode $(GOPATH)/bin/querynode
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/master $(GOPATH)/bin/master
@mkdir -p $(GOPATH)/bin && cp -f $(PWD)/bin/proxynode $(GOPATH)/bin/proxynode
......@@ -206,6 +209,7 @@ clean:
@rm -rf $(GOPATH)/bin/master
@rm -rf $(GOPATH)/bin/proxynode
@rm -rf $(GOPATH)/bin/proxyservice
@rm -rf $(GOPATH)/bin/queryservice
@rm -rf $(GOPATH)/bin/querynode
@rm -rf $(GOPATH)/bin/writenode
@rm -rf $(GOPATH)/bin/singlenode
......
......@@ -40,18 +40,18 @@ services:
networks:
- milvus
indexbuilder:
image: ${TARGET_REPO}/indexbuilder:${TARGET_TAG}
queryservice:
image: ${TARGET_REPO}/queryservice:${TARGET_TAG}
build:
context: ../../../
dockerfile: build/docker/deploy/indexbuilder/DockerFile
dockerfile: build/docker/deploy/queryservice/DockerFile
cache_from:
- ${SOURCE_REPO}/indexbuilder:${SOURCE_TAG}
- ${SOURCE_REPO}/queryservice:${SOURCE_TAG}
environment:
MASTER_ADDRESS: ${MASTER_ADDRESS}
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
MINIO_ADDRESS: ${MINIO_ADDRESS}
networks:
- milvus
......
......@@ -9,8 +9,23 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
FROM milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-latest AS openblas
#FROM alpine
FROM ubuntu:bionic-20200921
RUN apt-get update && apt-get install -y --no-install-recommends libtbb-dev gfortran
#RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories
#RUN sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories \
# && apk add --no-cache libtbb gfortran
COPY --from=openblas /usr/lib/libopenblas-r0.3.9.so /usr/lib/
RUN ln -s /usr/lib/libopenblas-r0.3.9.so /usr/lib/libopenblas.so.0 && \
ln -s /usr/lib/libopenblas.so.0 /usr/lib/libopenblas.so
COPY ./bin/queryservice /milvus-distributed/bin/queryservice
COPY ./configs/ /milvus-distributed/configs/
......@@ -21,6 +36,4 @@ ENV LD_LIBRARY_PATH=/milvus-distributed/lib:$LD_LIBRARY_PATH:/usr/lib
WORKDIR /milvus-distributed/
CMD ["./bin/masterservice"]
EXPOSE 19531
CMD ["./bin/queryservice"]
......@@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
......@@ -10,16 +9,16 @@ import (
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/querynode"
grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
)
func main() {
querynode.Init()
fmt.Println("QueryNodeID is", querynode.Params.QueryNodeID)
// Creates server.
ctx, cancel := context.WithCancel(context.Background())
svr := querynode.NewQueryNode(ctx, 0)
svr := grpcquerynode.NewServer(ctx)
if err := svr.Init(); err != nil {
panic(err)
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
......@@ -34,12 +33,16 @@ func main() {
cancel()
}()
svr.Start()
if err := svr.Start(); err != nil {
panic(err)
}
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
svr.Stop()
if err := svr.Stop(); err != nil {
panic(err)
}
switch sig {
case syscall.SIGTERM:
exit(0)
......
......@@ -2,83 +2,38 @@ package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
dds "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
)
const reTryCnt = 3
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err := queryservice.NewQueryService(ctx)
if err != nil {
svr := grpcqueryservice.NewServer(ctx)
if err := svr.Init(); err != nil {
panic(err)
}
log.Printf("query service address : %s:%d", queryservice.Params.Address, queryservice.Params.Port)
cnt := 0
// init data service client
ds.Params.Init()
log.Printf("data service address : %s:%d", ds.Params.Address, ds.Params.Port)
dataClient := dds.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataClient.Init(); err != nil {
log.Printf("query service address : %s", queryservice.Params.Address)
if err := svr.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < reTryCnt; cnt++ {
dsStates, err := dataClient.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, dsStates.Status.Reason)
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= reTryCnt {
panic("connect to data service failed")
}
//// init index service client
//is.Params.Init()
//log.Printf("index service address : %s:%d", is.Params.Address, is.Params.Port)
//indexClient := dis.NewClient(fmt.Sprintf("%s:%d", is.Params.Address, is.Params.Port))
//// TODO: retry to check index service status
//
//if err = svr(dataService); err != nil {
// panic(err)
//}
//
//log.Printf("index service address : %s", is.Params.Address)
//indexService := isc.NewClient(is.Params.Address)
//
//if err = svr.SetIndexService(indexService); err != nil {
// panic(err)
//}
//
//if err = svr.Start(); err != nil {
// panic(err)
//}
//
//sc := make(chan os.Signal, 1)
//signal.Notify(sc,
// syscall.SIGHUP,
// syscall.SIGINT,
// syscall.SIGTERM,
// syscall.SIGQUIT)
//sig := <-sc
//log.Printf("Got %s signal to exit", sig.String())
//_ = svr.Stop()
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Printf("Got %s signal to exit", sig.String())
_ = svr.Stop()
}
......@@ -2,14 +2,22 @@ package grpcquerynode
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/indexservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/querynode"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
)
type Server struct {
......@@ -17,28 +25,49 @@ type Server struct {
node *querynode.QueryNode
}
func NewServer(ctx context.Context, queryNodeID uint64) *Server {
return &Server{
node: querynode.NewQueryNode(ctx, queryNodeID),
func NewServer(ctx context.Context) *Server {
server := &Server{
node: querynode.NewQueryNodeWithoutID(ctx),
}
queryservice.Params.Init()
queryClient := grpcqueryserviceclient.NewClient(queryservice.Params.Address)
if err := server.node.SetQueryService(queryClient); err != nil {
panic(err)
}
indexservice.Params.Init()
indexClient := grpcindexserviceclient.NewClient(indexservice.Params.Address)
if err := server.node.SetIndexService(indexClient); err != nil {
panic(err)
}
dataservice.Params.Init()
log.Println("connect to data service, address =", fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port))
dataClient := grpcdataserviceclient.NewClient(fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port))
if err := server.node.SetDataService(dataClient); err != nil {
panic(err)
}
return server
}
func (s *Server) StartGrpcServer() {
// TODO: add address
lis, err := net.Listen("tcp", "")
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", querynode.Params.QueryNodePort))
if err != nil {
panic(err)
}
s.grpcServer = grpc.NewServer()
querypb.RegisterQueryNodeServer(s.grpcServer, s)
fmt.Println("start query node grpc server...")
if err = s.grpcServer.Serve(lis); err != nil {
panic(err)
}
}
func (s *Server) Init() error {
return s.Init()
return s.node.Init()
}
func (s *Server) Start() error {
......
......@@ -11,7 +11,7 @@ import (
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
masterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......@@ -33,7 +33,9 @@ type Server struct {
func (s *Server) Init() error {
log.Println("query service init")
s.queryService.Init()
if err := s.queryService.Init(); err != nil {
panic(err)
}
s.queryService.SetEnableGrpc(true)
return nil
}
......@@ -153,7 +155,7 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*
return s.queryService.CreateQueryChannel()
}
func (s *Server) NewServer(ctx context.Context) *Server {
func NewServer(ctx context.Context) *Server {
ctx1, cancel := context.WithCancel(ctx)
service, err := queryservice.NewQueryService(ctx)
if err != nil {
......
......@@ -32,7 +32,16 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Node = typeutil.QueryNodeInterface
type Node interface {
typeutil.Component
AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
}
type QueryService = typeutil.QueryServiceInterface
type QueryNode struct {
......@@ -60,6 +69,7 @@ type QueryNode struct {
closer io.Closer
// clients
queryClient QueryServiceInterface
indexClient IndexServiceInterface
dataClient DataServiceInterface
}
......@@ -107,33 +117,77 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
return node
}
func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
ctx1, cancel := context.WithCancel(ctx)
node := &QueryNode{
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
dataSyncService: nil,
metaService: nil,
searchService: nil,
statsService: nil,
segManager: nil,
}
var err error
cfg := &config.Configuration{
ServiceName: "query_node",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
node.tracer, node.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(node.tracer)
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
tSafe := newTSafe()
node.replica = &collectionReplicaImpl{
collections: collections,
segments: segmentsMap,
tSafe: tSafe,
}
node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
return node
}
// TODO: delete this and call node.Init()
func Init() {
Params.Init()
}
func (node *QueryNode) Init() error {
Params.Init()
return nil
}
func (node *QueryNode) Start() error {
registerReq := &queryPb.RegisterNodeRequest{
Address: &commonpb.Address{
Ip: Params.QueryNodeIP,
Port: Params.QueryNodePort,
},
}
var client QueryService // TODO: init interface
response, err := client.RegisterNode(registerReq)
response, err := node.queryClient.RegisterNode(registerReq)
if err != nil {
panic(err)
}
if response.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
panic(response.Status.Reason)
}
// TODO: use response.initParams
Params.Init()
return nil
}
Params.QueryNodeID = response.InitParams.NodeID
fmt.Println("QueryNodeID is", Params.QueryNodeID)
func (node *QueryNode) Start() error {
if node.indexClient == nil {
log.Println("WARN: null index service detected")
}
......@@ -189,6 +243,14 @@ func (node *QueryNode) Stop() error {
return nil
}
func (node *QueryNode) SetQueryService(query QueryServiceInterface) error {
if query == nil {
return errors.New("query index service interface")
}
node.queryClient = query
return nil
}
func (node *QueryNode) SetIndexService(index IndexServiceInterface) error {
if index == nil {
return errors.New("null index service interface")
......
......@@ -3,6 +3,7 @@ package querynode
import (
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
......@@ -16,6 +17,10 @@ type TimeRange struct {
timestampMax Timestamp
}
type QueryServiceInterface interface {
RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
}
type DataServiceInterface interface {
GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
}
......
......@@ -2,6 +2,7 @@ package queryservice
import (
"context"
"fmt"
"log"
"sort"
"strconv"
......@@ -76,6 +77,26 @@ func (qs *QueryService) Stop() error {
return nil
}
//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetDataService(d)
// if err != nil {
// return err
// }
// }
// return nil
//}
//
//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error {
// for _, v := range qs.queryNodeClient {
// err := v.SetIndexService(i)
// if err != nil {
// return err
// }
// }
// return nil
//}
func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) {
serviceComponentInfo := &internalpb2.ComponentInfo{
NodeID: Params.QueryServiceID,
......@@ -112,6 +133,7 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
// TODO:: do addWatchDmChannel to query node after registerNode
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
fmt.Println("register query node =", req.Address)
allocatedID := qs.numRegisterNode
qs.numRegisterNode++
......@@ -134,7 +156,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb
nodeID: allocatedID,
}
}
qs.queryNodes[allocatedID] = node
qs.queryNodes = append(qs.queryNodes, node)
return &querypb.RegisterNodeResponse{
Status: &commonpb.Status{
......
......@@ -34,7 +34,3 @@ func TestQueryService_Init(t *testing.T) {
service.Stop()
}
//func TestQueryService_Load(t *testing.T) {
//
//}
......@@ -49,13 +49,3 @@ type QueryServiceInterface interface {
CreateQueryChannel() (*querypb.CreateQueryChannelResponse, error)
GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
}
type QueryNodeInterface interface {
Component
AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册