提交 7a3223dc 编写于 作者: T ThreadDao 提交者: yefu.chen

tags collection stats

Signed-off-by: NThreadDao <yufen.zong@zilliz.com>
上级 3ef18e86
......@@ -11,21 +11,23 @@ import (
"github.com/opentracing/opentracing-go"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
"github.com/zilliztech/milvus-distributed/internal/indexnode"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"google.golang.org/grpc"
)
type Server struct {
impl *indexnode.IndexNode
indexnode *indexnode.IndexNode
grpcServer *grpc.Server
grpcErrChan chan error
indexServiceClient *grpcindexserviceclient.Client
indexServiceClient types.IndexService
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
......@@ -105,16 +107,16 @@ func (s *Server) init() error {
if err != nil {
return err
}
s.impl.SetIndexServiceClient(s.indexServiceClient)
s.indexnode.SetIndexServiceClient(s.indexServiceClient)
indexnode.Params.Init()
indexnode.Params.Port = Params.Port
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
s.indexnode.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
err = s.impl.Init()
err = s.indexnode.Init()
if err != nil {
return err
}
......@@ -122,7 +124,7 @@ func (s *Server) init() error {
}
func (s *Server) start() error {
err := s.impl.Start()
err := s.indexnode.Start()
if err != nil {
return err
}
......@@ -131,8 +133,8 @@ func (s *Server) start() error {
func (s *Server) Stop() error {
s.loopCancel()
if s.impl != nil {
s.impl.Stop()
if s.indexnode != nil {
s.indexnode.Stop()
}
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
......@@ -143,23 +145,23 @@ func (s *Server) Stop() error {
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
return s.impl.BuildIndex(ctx, req)
return s.indexnode.BuildIndex(ctx, req)
}
func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.impl.DropIndex(ctx, request)
return s.indexnode.DropIndex(ctx, request)
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates(ctx)
return s.indexnode.GetComponentStates(ctx)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetTimeTickChannel(ctx)
return s.indexnode.GetTimeTickChannel(ctx)
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetStatisticsChannel(ctx)
return s.indexnode.GetStatisticsChannel(ctx)
}
func NewServer(ctx context.Context) (*Server, error) {
......@@ -173,7 +175,7 @@ func NewServer(ctx context.Context) (*Server, error) {
return &Server{
loopCtx: ctx1,
loopCancel: cancel,
impl: node,
indexnode: node,
grpcErrChan: make(chan error),
}, nil
}
......@@ -26,7 +26,7 @@ type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Server struct {
impl *indexservice.IndexService
indexservice *indexservice.IndexService
grpcServer *grpc.Server
grpcErrChan chan error
......@@ -56,20 +56,20 @@ func (s *Server) init() error {
s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
// wait for grpc impl loop start
// wait for grpc indexservice loop start
if err := <-s.grpcErrChan; err != nil {
return err
}
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
s.indexservice.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
if err := s.impl.Init(); err != nil {
if err := s.indexservice.Init(); err != nil {
return err
}
return nil
}
func (s *Server) start() error {
if err := s.impl.Start(); err != nil {
if err := s.indexservice.Start(); err != nil {
return err
}
log.Println("indexService started")
......@@ -80,8 +80,8 @@ func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
}
if s.impl != nil {
s.impl.Stop()
if s.indexservice != nil {
s.indexservice.Stop()
}
s.loopCancel()
......@@ -94,27 +94,27 @@ func (s *Server) Stop() error {
}
func (s *Server) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
return s.impl.RegisterNode(ctx, req)
return s.indexservice.RegisterNode(ctx, req)
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
return s.impl.BuildIndex(ctx, req)
return s.indexservice.BuildIndex(ctx, req)
}
func (s *Server) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
return s.impl.GetIndexStates(ctx, req)
return s.indexservice.GetIndexStates(ctx, req)
}
func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.impl.DropIndex(ctx, request)
return s.indexservice.DropIndex(ctx, request)
}
func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
return s.impl.GetIndexFilePaths(ctx, req)
return s.indexservice.GetIndexFilePaths(ctx, req)
}
func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) {
return s.impl.NotifyBuildIndex(ctx, nty)
return s.indexservice.NotifyBuildIndex(ctx, nty)
}
func (s *Server) startGrpcLoop(grpcPort int) {
......@@ -147,15 +147,15 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.impl.GetComponentStates(ctx)
return s.indexservice.GetComponentStates(ctx)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetTimeTickChannel(ctx)
return s.indexservice.GetTimeTickChannel(ctx)
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
return s.impl.GetStatisticsChannel(ctx)
return s.indexservice.GetStatisticsChannel(ctx)
}
func NewServer(ctx context.Context) (*Server, error) {
......@@ -167,10 +167,10 @@ func NewServer(ctx context.Context) (*Server, error) {
return nil, err
}
s := &Server{
loopCtx: ctx1,
loopCancel: cancel,
impl: serverImp,
grpcErrChan: make(chan error),
loopCtx: ctx1,
loopCancel: cancel,
indexservice: serverImp,
grpcErrChan: make(chan error),
}
cfg := &config.Configuration{
......
......@@ -6,10 +6,13 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
)
type Client struct {
......@@ -18,6 +21,16 @@ type Client struct {
ctx context.Context
}
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
//TODO
panic("implement me")
}
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
//TODO
panic("implement me")
}
func (c *Client) Init() error {
tracer := opentracing.GlobalTracer()
connectGrpcFunc := func() error {
......
......@@ -9,6 +9,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/types"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
......@@ -268,20 +270,20 @@ func (s *Server) Stop() error {
return nil
}
func (s *Server) SetMasterService(master qn.MasterServiceInterface) error {
return s.impl.SetMasterService(master)
func (s *Server) SetMasterService(masterService types.MasterService) error {
return s.impl.SetMasterService(masterService)
}
func (s *Server) SetQueryService(query qn.QueryServiceInterface) error {
return s.impl.SetQueryService(query)
func (s *Server) SetQueryService(queryService types.QueryService) error {
return s.impl.SetQueryService(queryService)
}
func (s *Server) SetIndexService(index qn.IndexServiceInterface) error {
return s.impl.SetIndexService(index)
func (s *Server) SetIndexService(indexService types.IndexService) error {
return s.impl.SetIndexService(indexService)
}
func (s *Server) SetDataService(data qn.DataServiceInterface) error {
return s.impl.SetDataService(data)
func (s *Server) SetDataService(dataService types.DataService) error {
return s.impl.SetDataService(dataService)
}
func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
......
......@@ -2,24 +2,24 @@ package indexnode
import (
"context"
"errors"
"fmt"
"io"
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"errors"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
)
const (
......@@ -39,7 +39,7 @@ type IndexNode struct {
kv kv.Base
serviceClient typeutil.IndexServiceInterface // method factory
serviceClient types.IndexService // method factory
// Add callback functions at different stages
startCallbacks []func()
......@@ -155,7 +155,7 @@ func (i *IndexNode) UpdateStateCode(code internalpb2.StateCode) {
i.stateCode = code
}
func (i *IndexNode) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) {
func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) {
i.serviceClient = serviceClient
}
......
......@@ -2,20 +2,18 @@ package indexnode
import (
"context"
"errors"
"fmt"
"log"
"strconv"
"errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/storage"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
const (
......@@ -75,7 +73,7 @@ type IndexBuildTask struct {
kv kv.Base
savePaths []string
cmd *indexpb.BuildIndexCmd
serviceClient typeutil.IndexServiceInterface
serviceClient types.IndexService
nodeID UniqueID
}
......
......@@ -3,11 +3,10 @@ package indexnode
import (
"container/list"
"context"
"errors"
"log"
"sync"
"errors"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/zilliztech/milvus-distributed/internal/kv"
......
......@@ -2,28 +2,26 @@ package indexservice
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/tso"
"go.etcd.io/etcd/clientv3"
"errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/tso"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
......
......@@ -19,6 +19,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
)
......
......@@ -7,6 +7,7 @@ import (
"errors"
grpcindexnodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode/client"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
......
......@@ -5,12 +5,13 @@ import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/types"
)
// An Item is something we manage in a priority queue.
type PQItem struct {
value typeutil.IndexNodeInterface // The value of the item; arbitrary.
value types.IndexNode // The value of the item; arbitrary.
key UniqueID
addr *commonpb.Address
priority int // The priority of the item in the queue.
......@@ -124,7 +125,7 @@ func (pq *PriorityQueue) Peek() interface{} {
//return item.value
}
func (pq *PriorityQueue) PeekClient() (UniqueID, typeutil.IndexNodeInterface) {
func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode) {
item := pq.Peek()
if item == nil {
return UniqueID(-1), nil
......@@ -132,11 +133,11 @@ func (pq *PriorityQueue) PeekClient() (UniqueID, typeutil.IndexNodeInterface) {
return item.(*PQItem).key, item.(*PQItem).value
}
func (pq *PriorityQueue) PeekAllClients() []typeutil.IndexNodeInterface {
func (pq *PriorityQueue) PeekAllClients() []types.IndexNode {
pq.lock.RLock()
defer pq.lock.RUnlock()
var ret []typeutil.IndexNodeInterface
var ret []types.IndexNode
for _, item := range pq.items {
ret = append(ret, item.value)
}
......
......@@ -2,16 +2,14 @@ package indexservice
import (
"context"
"errors"
"log"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
)
......@@ -67,7 +65,7 @@ type IndexAddTask struct {
idAllocator *allocator.GlobalIDAllocator
buildQueue TaskQueue
kv kv.Base
builderClient typeutil.IndexNodeInterface
builderClient types.IndexNode
nodeClients *PriorityQueue
buildClientNodeID UniqueID
}
......
......@@ -41,8 +41,8 @@ func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.C
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: request,
masterClient: node.masterClient,
dataServiceClient: node.dataServiceClient,
masterService: node.masterService,
dataServiceClient: node.dataService,
}
err := node.sched.DdQueue.Enqueue(cct)
......@@ -71,7 +71,7 @@ func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.Dro
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropCollectionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(dct)
......@@ -100,7 +100,7 @@ func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasC
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasCollectionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(hct)
......@@ -135,7 +135,7 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
queryService: node.queryService,
}
err := node.sched.DdQueue.Enqueue(lct)
......@@ -164,7 +164,7 @@ func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleaseCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
queryService: node.queryService,
}
err := node.sched.DdQueue.Enqueue(rct)
......@@ -193,7 +193,7 @@ func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeCollectionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(dct)
......@@ -225,7 +225,7 @@ func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *mil
ctx: ctx,
Condition: NewTaskCondition(ctx),
CollectionStatsRequest: request,
dataServiceClient: node.dataServiceClient,
dataService: node.dataService,
}
err := node.sched.DdQueue.Enqueue(g)
......@@ -257,7 +257,7 @@ func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.Sh
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowCollectionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(sct)
......@@ -289,7 +289,7 @@ func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.Cr
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreatePartitionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
result: nil,
}
......@@ -316,7 +316,7 @@ func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.Drop
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropPartitionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
result: nil,
}
......@@ -344,7 +344,7 @@ func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPa
ctx: ctx,
Condition: NewTaskCondition(ctx),
HasPartitionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
result: nil,
}
......@@ -379,7 +379,7 @@ func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.Loa
ctx: ctx,
Condition: NewTaskCondition(ctx),
LoadPartitonRequest: request,
queryserviceClient: node.queryServiceClient,
queryService: node.queryService,
}
err := node.sched.DdQueue.Enqueue(lpt)
......@@ -408,7 +408,7 @@ func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.
ctx: ctx,
Condition: NewTaskCondition(ctx),
ReleasePartitionRequest: request,
queryserviceClient: node.queryServiceClient,
queryService: node.queryService,
}
err := node.sched.DdQueue.Enqueue(rpt)
......@@ -440,7 +440,7 @@ func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.Sho
ctx: ctx,
Condition: NewTaskCondition(ctx),
ShowPartitionRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
result: nil,
}
......@@ -473,7 +473,7 @@ func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.Create
ctx: ctx,
Condition: NewTaskCondition(ctx),
CreateIndexRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(cit)
......@@ -501,7 +501,7 @@ func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.Desc
ctx: ctx,
Condition: NewTaskCondition(ctx),
DescribeIndexRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(dit)
......@@ -533,7 +533,7 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde
ctx: ctx,
Condition: NewTaskCondition(ctx),
DropIndexRequest: request,
masterClient: node.masterClient,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(dit)
if err != nil {
......@@ -555,11 +555,11 @@ func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropInde
func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
// log.Println("Describe index progress for: ", request)
dipt := &GetIndexStateTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,
indexServiceClient: node.indexServiceClient,
masterClient: node.masterClient,
ctx: ctx,
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,
indexService: node.indexService,
masterService: node.masterService,
}
err := node.sched.DdQueue.Enqueue(dipt)
......@@ -587,9 +587,9 @@ func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.Inde
func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
it := &InsertTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
dataServiceClient: node.dataServiceClient,
ctx: ctx,
Condition: NewTaskCondition(ctx),
dataService: node.dataService,
BaseInsertTask: BaseInsertTask{
BaseMsg: msgstream.BaseMsg{
HashValues: request.HashKeys,
......@@ -676,10 +676,10 @@ func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchReque
func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
log.Println("AA Flush collections: ", request.CollectionNames)
ft := &FlushTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
FlushRequest: request,
dataServiceClient: node.dataServiceClient,
ctx: ctx,
Condition: NewTaskCondition(ctx),
FlushRequest: request,
dataService: node.dataService,
}
err := node.sched.DdQueue.Enqueue(ft)
......@@ -716,7 +716,7 @@ func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvus
resp.Status.Reason = err.Error()
return resp, nil
}
infoResp, err := node.dataServiceClient.GetSegmentInfo(ctx, &datapb.SegmentInfoRequest{
infoResp, err := node.dataService.GetSegmentInfo(ctx, &datapb.SegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
......@@ -763,7 +763,7 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Qu
resp.Status.Reason = err.Error()
return resp, nil
}
infoResp, err := node.queryServiceClient.GetSegmentInfo(ctx, &querypb.SegmentInfoRequest{
infoResp, err := node.queryService.GetSegmentInfo(ctx, &querypb.SegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kSegmentInfo,
MsgID: 0,
......@@ -798,7 +798,7 @@ func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Qu
}
func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
describeCollectionResponse, err := node.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
describeCollectionResponse, err := node.masterService.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
MsgID: 0,
......@@ -815,7 +815,7 @@ func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName strin
return nil, errors.New(describeCollectionResponse.Status.Reason)
}
collectionID := describeCollectionResponse.CollectionID
showPartitionsResp, err := node.masterClient.ShowPartitions(ctx, &milvuspb.ShowPartitionRequest{
showPartitionsResp, err := node.masterService.ShowPartitions(ctx, &milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: 0,
......@@ -835,7 +835,7 @@ func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName strin
ret := make([]UniqueID, 0)
for _, partitionID := range showPartitionsResp.PartitionIDs {
showSegmentResponse, err := node.masterClient.ShowSegments(ctx, &milvuspb.ShowSegmentRequest{
showSegmentResponse, err := node.masterService.ShowSegments(ctx, &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
MsgID: 0,
......
package proxynode
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type MasterClient interface {
CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error)
ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
}
type IndexServiceClient interface {
GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error)
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
}
type QueryServiceClient interface {
ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error)
LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error)
LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error)
CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
//GetSearchChannelNames() ([]string, error)
//GetSearchResultChannels() ([]string, error)
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
}
type DataServiceClient interface {
AssignSegmentID(ctx context.Context, req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
GetInsertChannels(ctx context.Context, req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
GetCollectionStatistics(ctx context.Context, req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
GetSegmentInfo(ctx context.Context, req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
}
type ProxyServiceClient interface {
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
}
type Service interface {
typeutil.Service
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error)
HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error)
LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error)
DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error)
ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error)
DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error)
HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error)
LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error)
GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error)
ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error)
DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error)
Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error)
Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error)
Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error)
GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error)
GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error)
GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error)
}
......@@ -9,14 +9,10 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type MasterClientInterface interface {
DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
}
type Cache interface {
GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error)
GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
......@@ -32,7 +28,7 @@ type collectionInfo struct {
}
type MetaCache struct {
client MasterClientInterface
client types.MasterService
collInfo map[string]*collectionInfo
mu sync.RWMutex
......@@ -40,7 +36,7 @@ type MetaCache struct {
var globalMetaCache Cache
func InitMetaCache(client MasterClientInterface) error {
func InitMetaCache(client types.MasterService) error {
var err error
globalMetaCache, err = NewMetaCache(client)
if err != nil {
......@@ -49,7 +45,7 @@ func InitMetaCache(client MasterClientInterface) error {
return nil
}
func NewMetaCache(client MasterClientInterface) (*MetaCache, error) {
func NewMetaCache(client types.MasterService) (*MetaCache, error) {
return &MetaCache{
client: client,
collInfo: map[string]*collectionInfo{},
......
......@@ -4,6 +4,8 @@ import (
"context"
"testing"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
......@@ -12,6 +14,7 @@ import (
)
type MockMasterClientInterface struct {
types.MasterService
}
func (m *MockMasterClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
......
......@@ -2,25 +2,22 @@ package proxynode
import (
"context"
"fmt"
"errors"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
)
type UniqueID = typeutil.UniqueID
......@@ -37,11 +34,11 @@ type ProxyNode struct {
stateCode atomic.Value
masterClient MasterClient
indexServiceClient IndexServiceClient
dataServiceClient DataServiceClient
proxyServiceClient ProxyServiceClient
queryServiceClient QueryServiceClient
masterService types.MasterService
indexService types.IndexService
dataService types.DataService
proxyService types.ProxyService
queryService types.QueryService
sched *TaskScheduler
tick *timeTick
......@@ -72,39 +69,11 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
}
type Component interface {
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
}
func (node *ProxyNode) waitForServiceReady(ctx context.Context, service Component, serviceName string) error {
checkFunc := func() error {
resp, err := service.GetComponentStates(ctx)
if err != nil {
return err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return errors.New(resp.Status.Reason)
}
if resp.State.StateCode != internalpb2.StateCode_HEALTHY {
return errors.New("")
}
return nil
}
// wait for 10 seconds
err := retry.Retry(200, time.Millisecond*200, checkFunc)
if err != nil {
errMsg := fmt.Sprintf("ProxyNode wait for %s ready failed", serviceName)
return errors.New(errMsg)
}
return nil
}
func (node *ProxyNode) Init() error {
// todo wait for proxyservice state changed to Healthy
ctx := context.Background()
err := node.waitForServiceReady(ctx, node.proxyServiceClient, "ProxyService")
err := funcutil.WaitForComponentHealthy(ctx, node.proxyService, "ProxyService", 100, time.Millisecond*200)
if err != nil {
return err
}
......@@ -117,7 +86,7 @@ func (node *ProxyNode) Init() error {
},
}
response, err := node.proxyServiceClient.RegisterNode(ctx, request)
response, err := node.proxyService.RegisterNode(ctx, request)
if err != nil {
return err
}
......@@ -131,31 +100,31 @@ func (node *ProxyNode) Init() error {
}
// wait for dataservice state changed to Healthy
if node.dataServiceClient != nil {
err = node.waitForServiceReady(ctx, node.dataServiceClient, "DataService")
if node.dataService != nil {
err := funcutil.WaitForComponentHealthy(ctx, node.dataService, "DataService", 100, time.Millisecond*200)
if err != nil {
return err
}
}
// wait for queryservice state changed to Healthy
if node.queryServiceClient != nil {
err = node.waitForServiceReady(ctx, node.queryServiceClient, "QueryService")
// wait for queryService state changed to Healthy
if node.queryService != nil {
err := funcutil.WaitForComponentHealthy(ctx, node.queryService, "QueryService", 100, time.Millisecond*200)
if err != nil {
return err
}
}
// wait for indexservice state changed to Healthy
if node.indexServiceClient != nil {
err = node.waitForServiceReady(ctx, node.indexServiceClient, "IndexService")
if node.indexService != nil {
err := funcutil.WaitForComponentHealthy(ctx, node.indexService, "IndexService", 100, time.Millisecond*200)
if err != nil {
return err
}
}
if node.queryServiceClient != nil {
resp, err := node.queryServiceClient.CreateQueryChannel(ctx)
if node.queryService != nil {
resp, err := node.queryService.CreateQueryChannel(ctx)
if err != nil {
return err
}
......@@ -168,7 +137,7 @@ func (node *ProxyNode) Init() error {
}
// todo
//Params.InsertChannelNames, err = node.dataServiceClient.GetInsertChannels()
//Params.InsertChannelNames, err = node.dataService.GetInsertChannels()
//if err != nil {
// return err
//}
......@@ -204,7 +173,7 @@ func (node *ProxyNode) Init() error {
node.tsoAllocator = tsoAllocator
node.tsoAllocator.PeerID = Params.ProxyID
segAssigner, err := NewSegIDAssigner(node.ctx, node.dataServiceClient, node.lastTick)
segAssigner, err := NewSegIDAssigner(node.ctx, node.dataService, node.lastTick)
if err != nil {
panic(err)
}
......@@ -232,7 +201,7 @@ func (node *ProxyNode) Init() error {
}
func (node *ProxyNode) Start() error {
err := InitMetaCache(node.masterClient)
err := InitMetaCache(node.masterService)
if err != nil {
return err
}
......@@ -308,22 +277,22 @@ func (node *ProxyNode) AddCloseCallback(callbacks ...func()) {
node.closeCallbacks = append(node.closeCallbacks, callbacks...)
}
func (node *ProxyNode) SetMasterClient(cli MasterClient) {
node.masterClient = cli
func (node *ProxyNode) SetMasterClient(cli types.MasterService) {
node.masterService = cli
}
func (node *ProxyNode) SetIndexServiceClient(cli IndexServiceClient) {
node.indexServiceClient = cli
func (node *ProxyNode) SetIndexServiceClient(cli types.IndexService) {
node.indexService = cli
}
func (node *ProxyNode) SetDataServiceClient(cli DataServiceClient) {
node.dataServiceClient = cli
func (node *ProxyNode) SetDataServiceClient(cli types.DataService) {
node.dataService = cli
}
func (node *ProxyNode) SetProxyServiceClient(cli ProxyServiceClient) {
node.proxyServiceClient = cli
func (node *ProxyNode) SetProxyServiceClient(cli types.ProxyService) {
node.proxyService = cli
}
func (node *ProxyNode) SetQueryServiceClient(cli QueryServiceClient) {
node.queryServiceClient = cli
func (node *ProxyNode) SetQueryServiceClient(cli types.QueryService) {
node.queryService = cli
}
......@@ -10,10 +10,11 @@ import (
"errors"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
......@@ -127,21 +128,21 @@ type SegIDAssigner struct {
getTickFunc func() Timestamp
PeerID UniqueID
serviceClient DataServiceClient
countPerRPC uint32
dataService types.DataService
countPerRPC uint32
}
func NewSegIDAssigner(ctx context.Context, client DataServiceClient, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
func NewSegIDAssigner(ctx context.Context, dataService types.DataService, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
ctx1, cancel := context.WithCancel(ctx)
sa := &SegIDAssigner{
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
},
countPerRPC: SegCountPerRPC,
serviceClient: client,
assignInfos: make(map[UniqueID]*list.List),
getTickFunc: getTickFunc,
countPerRPC: SegCountPerRPC,
dataService: dataService,
assignInfos: make(map[UniqueID]*list.List),
getTickFunc: getTickFunc,
}
sa.TChan = &allocator.Ticker{
UpdateInterval: time.Second,
......@@ -154,8 +155,8 @@ func NewSegIDAssigner(ctx context.Context, client DataServiceClient, getTickFunc
return sa, nil
}
func (sa *SegIDAssigner) SetServiceClient(client DataServiceClient) {
sa.serviceClient = client
func (sa *SegIDAssigner) SetServiceClient(client types.DataService) {
sa.dataService = client
}
func (sa *SegIDAssigner) collectExpired() {
......@@ -288,7 +289,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
}
sa.segReqs = []*datapb.SegIDRequest{}
resp, err := sa.serviceClient.AssignSegmentID(ctx, req)
resp, err := sa.dataService.AssignSegmentID(ctx, req)
if err != nil {
log.Println("GRPC AssignSegmentID Failed", resp, err)
......
......@@ -7,6 +7,8 @@ import (
"math"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"github.com/golang/protobuf/proto"
......@@ -68,10 +70,10 @@ type BaseInsertTask = msgstream.InsertMsg
type InsertTask struct {
BaseInsertTask
Condition
ctx context.Context
dataServiceClient DataServiceClient
result *milvuspb.InsertResponse
rowIDAllocator *allocator.IDAllocator
ctx context.Context
dataService types.DataService
result *milvuspb.InsertResponse
rowIDAllocator *allocator.IDAllocator
}
func (it *InsertTask) Ctx() context.Context {
......@@ -197,7 +199,7 @@ func (it *InsertTask) Execute(ctx context.Context) error {
stream, err := globalInsertChannelsMap.getInsertMsgStream(collID)
if err != nil {
resp, _ := it.dataServiceClient.GetInsertChannels(ctx, &datapb.InsertChannelRequest{
resp, _ := it.dataService.GetInsertChannels(ctx, &datapb.InsertChannelRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kInsert, // todo
MsgID: it.Base.MsgID, // todo
......@@ -243,8 +245,8 @@ type CreateCollectionTask struct {
Condition
*milvuspb.CreateCollectionRequest
ctx context.Context
masterClient MasterClient
dataServiceClient DataServiceClient
masterService types.MasterService
dataServiceClient types.DataService
result *commonpb.Status
schema *schemapb.CollectionSchema
}
......@@ -352,7 +354,7 @@ func (cct *CreateCollectionTask) PreExecute(ctx context.Context) error {
func (cct *CreateCollectionTask) Execute(ctx context.Context) error {
var err error
cct.result, err = cct.masterClient.CreateCollection(ctx, cct.CreateCollectionRequest)
cct.result, err = cct.masterService.CreateCollection(ctx, cct.CreateCollectionRequest)
if err != nil {
return err
}
......@@ -392,9 +394,9 @@ func (cct *CreateCollectionTask) PostExecute(ctx context.Context) error {
type DropCollectionTask struct {
Condition
*milvuspb.DropCollectionRequest
ctx context.Context
masterClient MasterClient
result *commonpb.Status
ctx context.Context
masterService types.MasterService
result *commonpb.Status
}
func (dct *DropCollectionTask) Ctx() context.Context {
......@@ -450,7 +452,7 @@ func (dct *DropCollectionTask) Execute(ctx context.Context) error {
return err
}
dct.result, err = dct.masterClient.DropCollection(ctx, dct.DropCollectionRequest)
dct.result, err = dct.masterService.DropCollection(ctx, dct.DropCollectionRequest)
if err != nil {
return err
}
......@@ -742,9 +744,9 @@ func (st *SearchTask) PostExecute(ctx context.Context) error {
type HasCollectionTask struct {
Condition
*milvuspb.HasCollectionRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.BoolResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.BoolResponse
}
func (hct *HasCollectionTask) Ctx() context.Context {
......@@ -796,7 +798,7 @@ func (hct *HasCollectionTask) PreExecute(ctx context.Context) error {
func (hct *HasCollectionTask) Execute(ctx context.Context) error {
var err error
hct.result, err = hct.masterClient.HasCollection(ctx, hct.HasCollectionRequest)
hct.result, err = hct.masterService.HasCollection(ctx, hct.HasCollectionRequest)
if hct.result == nil {
return errors.New("has collection resp is nil")
}
......@@ -813,9 +815,9 @@ func (hct *HasCollectionTask) PostExecute(ctx context.Context) error {
type DescribeCollectionTask struct {
Condition
*milvuspb.DescribeCollectionRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.DescribeCollectionResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.DescribeCollectionResponse
}
func (dct *DescribeCollectionTask) Ctx() context.Context {
......@@ -867,7 +869,7 @@ func (dct *DescribeCollectionTask) PreExecute(ctx context.Context) error {
func (dct *DescribeCollectionTask) Execute(ctx context.Context) error {
var err error
dct.result, err = dct.masterClient.DescribeCollection(ctx, dct.DescribeCollectionRequest)
dct.result, err = dct.masterService.DescribeCollection(ctx, dct.DescribeCollectionRequest)
if dct.result == nil {
return errors.New("has collection resp is nil")
}
......@@ -884,9 +886,9 @@ func (dct *DescribeCollectionTask) PostExecute(ctx context.Context) error {
type GetCollectionsStatisticsTask struct {
Condition
*milvuspb.CollectionStatsRequest
ctx context.Context
dataServiceClient DataServiceClient
result *milvuspb.CollectionStatsResponse
ctx context.Context
dataService types.DataService
result *milvuspb.CollectionStatsResponse
}
func (g *GetCollectionsStatisticsTask) Ctx() context.Context {
......@@ -947,7 +949,7 @@ func (g *GetCollectionsStatisticsTask) Execute(ctx context.Context) error {
CollectionID: collID,
}
result, _ := g.dataServiceClient.GetCollectionStatistics(ctx, req)
result, _ := g.dataService.GetCollectionStatistics(ctx, req)
if result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -971,9 +973,9 @@ func (g *GetCollectionsStatisticsTask) PostExecute(ctx context.Context) error {
type ShowCollectionsTask struct {
Condition
*milvuspb.ShowCollectionRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.ShowCollectionResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.ShowCollectionResponse
}
func (sct *ShowCollectionsTask) Ctx() context.Context {
......@@ -1022,7 +1024,7 @@ func (sct *ShowCollectionsTask) PreExecute(ctx context.Context) error {
func (sct *ShowCollectionsTask) Execute(ctx context.Context) error {
var err error
sct.result, err = sct.masterClient.ShowCollections(ctx, sct.ShowCollectionRequest)
sct.result, err = sct.masterService.ShowCollections(ctx, sct.ShowCollectionRequest)
if sct.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1039,9 +1041,9 @@ func (sct *ShowCollectionsTask) PostExecute(ctx context.Context) error {
type CreatePartitionTask struct {
Condition
*milvuspb.CreatePartitionRequest
ctx context.Context
masterClient MasterClient
result *commonpb.Status
ctx context.Context
masterService types.MasterService
result *commonpb.Status
}
func (cpt *CreatePartitionTask) Ctx() context.Context {
......@@ -1099,7 +1101,7 @@ func (cpt *CreatePartitionTask) PreExecute(ctx context.Context) error {
}
func (cpt *CreatePartitionTask) Execute(ctx context.Context) (err error) {
cpt.result, err = cpt.masterClient.CreatePartition(ctx, cpt.CreatePartitionRequest)
cpt.result, err = cpt.masterService.CreatePartition(ctx, cpt.CreatePartitionRequest)
if cpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1116,9 +1118,9 @@ func (cpt *CreatePartitionTask) PostExecute(ctx context.Context) error {
type DropPartitionTask struct {
Condition
*milvuspb.DropPartitionRequest
ctx context.Context
masterClient MasterClient
result *commonpb.Status
ctx context.Context
masterService types.MasterService
result *commonpb.Status
}
func (dpt *DropPartitionTask) Ctx() context.Context {
......@@ -1176,7 +1178,7 @@ func (dpt *DropPartitionTask) PreExecute(ctx context.Context) error {
}
func (dpt *DropPartitionTask) Execute(ctx context.Context) (err error) {
dpt.result, err = dpt.masterClient.DropPartition(ctx, dpt.DropPartitionRequest)
dpt.result, err = dpt.masterService.DropPartition(ctx, dpt.DropPartitionRequest)
if dpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1193,9 +1195,9 @@ func (dpt *DropPartitionTask) PostExecute(ctx context.Context) error {
type HasPartitionTask struct {
Condition
*milvuspb.HasPartitionRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.BoolResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.BoolResponse
}
func (hpt *HasPartitionTask) Ctx() context.Context {
......@@ -1252,7 +1254,7 @@ func (hpt *HasPartitionTask) PreExecute(ctx context.Context) error {
}
func (hpt *HasPartitionTask) Execute(ctx context.Context) (err error) {
hpt.result, err = hpt.masterClient.HasPartition(ctx, hpt.HasPartitionRequest)
hpt.result, err = hpt.masterService.HasPartition(ctx, hpt.HasPartitionRequest)
if hpt.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1269,9 +1271,9 @@ func (hpt *HasPartitionTask) PostExecute(ctx context.Context) error {
type ShowPartitionsTask struct {
Condition
*milvuspb.ShowPartitionRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.ShowPartitionResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.ShowPartitionResponse
}
func (spt *ShowPartitionsTask) Ctx() context.Context {
......@@ -1323,7 +1325,7 @@ func (spt *ShowPartitionsTask) PreExecute(ctx context.Context) error {
func (spt *ShowPartitionsTask) Execute(ctx context.Context) error {
var err error
spt.result, err = spt.masterClient.ShowPartitions(ctx, spt.ShowPartitionRequest)
spt.result, err = spt.masterService.ShowPartitions(ctx, spt.ShowPartitionRequest)
if spt.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1340,9 +1342,9 @@ func (spt *ShowPartitionsTask) PostExecute(ctx context.Context) error {
type CreateIndexTask struct {
Condition
*milvuspb.CreateIndexRequest
ctx context.Context
masterClient MasterClient
result *commonpb.Status
ctx context.Context
masterService types.MasterService
result *commonpb.Status
}
func (cit *CreateIndexTask) Ctx() context.Context {
......@@ -1401,7 +1403,7 @@ func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
func (cit *CreateIndexTask) Execute(ctx context.Context) error {
var err error
cit.result, err = cit.masterClient.CreateIndex(ctx, cit.CreateIndexRequest)
cit.result, err = cit.masterService.CreateIndex(ctx, cit.CreateIndexRequest)
if cit.result == nil {
return errors.New("get collection statistics resp is nil")
}
......@@ -1418,9 +1420,9 @@ func (cit *CreateIndexTask) PostExecute(ctx context.Context) error {
type DescribeIndexTask struct {
Condition
*milvuspb.DescribeIndexRequest
ctx context.Context
masterClient MasterClient
result *milvuspb.DescribeIndexResponse
ctx context.Context
masterService types.MasterService
result *milvuspb.DescribeIndexResponse
}
func (dit *DescribeIndexTask) Ctx() context.Context {
......@@ -1484,7 +1486,7 @@ func (dit *DescribeIndexTask) PreExecute(ctx context.Context) error {
func (dit *DescribeIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.masterClient.DescribeIndex(ctx, dit.DescribeIndexRequest)
dit.result, err = dit.masterService.DescribeIndex(ctx, dit.DescribeIndexRequest)
log.Println("YYYYY:", dit.result)
if dit.result == nil {
return errors.New("get collection statistics resp is nil")
......@@ -1503,8 +1505,8 @@ type DropIndexTask struct {
Condition
ctx context.Context
*milvuspb.DropIndexRequest
masterClient MasterClient
result *commonpb.Status
masterService types.MasterService
result *commonpb.Status
}
func (dit *DropIndexTask) Ctx() context.Context {
......@@ -1563,7 +1565,7 @@ func (dit *DropIndexTask) PreExecute(ctx context.Context) error {
func (dit *DropIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.masterClient.DropIndex(ctx, dit.DropIndexRequest)
dit.result, err = dit.masterService.DropIndex(ctx, dit.DropIndexRequest)
if dit.result == nil {
return errors.New("drop index resp is nil")
}
......@@ -1580,10 +1582,10 @@ func (dit *DropIndexTask) PostExecute(ctx context.Context) error {
type GetIndexStateTask struct {
Condition
*milvuspb.IndexStateRequest
ctx context.Context
indexServiceClient IndexServiceClient
masterClient MasterClient
result *milvuspb.IndexStateResponse
ctx context.Context
indexService types.IndexService
masterService types.MasterService
result *milvuspb.IndexStateResponse
}
func (gist *GetIndexStateTask) Ctx() context.Context {
......@@ -1658,7 +1660,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionName: collectionName,
CollectionID: collectionID,
}
partitions, err := gist.masterClient.ShowPartitions(ctx, showPartitionRequest)
partitions, err := gist.masterService.ShowPartitions(ctx, showPartitionRequest)
if err != nil {
return err
}
......@@ -1680,7 +1682,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
IndexName: gist.IndexName,
}
indexDescriptionResp, err2 := gist.masterClient.DescribeIndex(ctx, &describeIndexReq)
indexDescriptionResp, err2 := gist.masterService.DescribeIndex(ctx, &describeIndexReq)
if err2 != nil {
return err2
}
......@@ -1710,7 +1712,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
PartitionID: partitionID,
}
segments, err := gist.masterClient.ShowSegments(ctx, showSegmentsRequest)
segments, err := gist.masterService.ShowSegments(ctx, showSegmentsRequest)
if err != nil {
return err
}
......@@ -1735,7 +1737,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
CollectionID: collectionID,
SegmentID: segmentID,
}
segmentDesc, err := gist.masterClient.DescribeSegment(ctx, describeSegmentRequest)
segmentDesc, err := gist.masterService.DescribeSegment(ctx, describeSegmentRequest)
if err != nil {
return err
}
......@@ -1756,7 +1758,7 @@ func (gist *GetIndexStateTask) Execute(ctx context.Context) error {
return err
}
states, err := gist.indexServiceClient.GetIndexStates(ctx, getIndexStatesRequest)
states, err := gist.indexService.GetIndexStates(ctx, getIndexStatesRequest)
if err != nil {
return err
}
......@@ -1799,9 +1801,9 @@ func (gist *GetIndexStateTask) PostExecute(ctx context.Context) error {
type FlushTask struct {
Condition
*milvuspb.FlushRequest
ctx context.Context
dataServiceClient DataServiceClient
result *commonpb.Status
ctx context.Context
dataService types.DataService
result *commonpb.Status
}
func (ft *FlushTask) Ctx() context.Context {
......@@ -1864,7 +1866,7 @@ func (ft *FlushTask) Execute(ctx context.Context) error {
CollectionID: collID,
}
var status *commonpb.Status
status, _ = ft.dataServiceClient.Flush(ctx, flushReq)
status, _ = ft.dataService.Flush(ctx, flushReq)
if status == nil {
return errors.New("flush resp is nil")
}
......@@ -1885,9 +1887,9 @@ func (ft *FlushTask) PostExecute(ctx context.Context) error {
type LoadCollectionTask struct {
Condition
*milvuspb.LoadCollectionRequest
ctx context.Context
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
queryService types.QueryService
result *commonpb.Status
}
func (lct *LoadCollectionTask) Ctx() context.Context {
......@@ -1961,7 +1963,7 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) {
CollectionID: collID,
Schema: collSchema,
}
lct.result, err = lct.queryserviceClient.LoadCollection(ctx, request)
lct.result, err = lct.queryService.LoadCollection(ctx, request)
return err
}
......@@ -1972,9 +1974,9 @@ func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error {
type ReleaseCollectionTask struct {
Condition
*milvuspb.ReleaseCollectionRequest
ctx context.Context
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
queryService types.QueryService
result *commonpb.Status
}
func (rct *ReleaseCollectionTask) Ctx() context.Context {
......@@ -2042,7 +2044,7 @@ func (rct *ReleaseCollectionTask) Execute(ctx context.Context) (err error) {
DbID: 0,
CollectionID: collID,
}
rct.result, err = rct.queryserviceClient.ReleaseCollection(ctx, request)
rct.result, err = rct.queryService.ReleaseCollection(ctx, request)
return err
}
......@@ -2053,9 +2055,9 @@ func (rct *ReleaseCollectionTask) PostExecute(ctx context.Context) error {
type LoadPartitionTask struct {
Condition
*milvuspb.LoadPartitonRequest
ctx context.Context
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
queryService types.QueryService
result *commonpb.Status
}
func (lpt *LoadPartitionTask) ID() UniqueID {
......@@ -2133,7 +2135,7 @@ func (lpt *LoadPartitionTask) Execute(ctx context.Context) error {
PartitionIDs: partitionIDs,
Schema: collSchema,
}
lpt.result, err = lpt.queryserviceClient.LoadPartitions(ctx, request)
lpt.result, err = lpt.queryService.LoadPartitions(ctx, request)
return err
}
......@@ -2144,9 +2146,9 @@ func (lpt *LoadPartitionTask) PostExecute(ctx context.Context) error {
type ReleasePartitionTask struct {
Condition
*milvuspb.ReleasePartitionRequest
ctx context.Context
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
queryService types.QueryService
result *commonpb.Status
}
func (rpt *ReleasePartitionTask) Ctx() context.Context {
......@@ -2223,7 +2225,7 @@ func (rpt *ReleasePartitionTask) Execute(ctx context.Context) (err error) {
CollectionID: collID,
PartitionIDs: partitionIDs,
}
rpt.result, err = rpt.queryserviceClient.ReleasePartitions(ctx, request)
rpt.result, err = rpt.queryService.ReleasePartitions(ctx, request)
return err
}
......
......@@ -156,7 +156,7 @@ func (s *ProxyService) Stop() error {
if err != nil {
panic(err)
}
log.Println("stop all node clients ...")
log.Println("stop all node ProxyNodes ...")
s.cancel()
......
package proxyservice
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Service interface {
typeutil.Component
typeutil.TimeTickHandler
RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error)
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
// TODO: i'm sure it's not a best way to keep consistency, fix me
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
......@@ -2,19 +2,15 @@ package proxyservice
import (
"context"
"errors"
"log"
"math/rand"
"strconv"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
grpcproxynodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode/client"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/types"
)
type NodeInfo struct {
......@@ -22,21 +18,13 @@ type NodeInfo struct {
port int64
}
type NodeClient interface {
Init() error
Start() error
Stop() error
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
}
type GlobalNodeInfoTable struct {
mtx sync.RWMutex
nodeIDs []UniqueID
infos map[UniqueID]*NodeInfo
createClientMtx sync.RWMutex
// lazy creating, so len(clients) <= len(infos)
clients map[UniqueID]NodeClient
// lazy creating, so len(ProxyNodes) <= len(infos)
ProxyNodes map[UniqueID]types.ProxyNode
}
func (table *GlobalNodeInfoTable) randomPick() UniqueID {
......@@ -82,22 +70,22 @@ func (table *GlobalNodeInfoTable) Register(id UniqueID, info *NodeInfo) error {
func (table *GlobalNodeInfoTable) createClients() error {
log.Println("infos: ", table.infos)
log.Println("clients: ", table.clients)
if len(table.clients) == len(table.infos) {
log.Println("ProxyNodes: ", table.ProxyNodes)
if len(table.ProxyNodes) == len(table.infos) {
return nil
}
for nodeID, info := range table.infos {
_, ok := table.clients[nodeID]
_, ok := table.ProxyNodes[nodeID]
if !ok {
log.Println(info)
table.clients[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)))
table.ProxyNodes[nodeID] = grpcproxynodeclient.NewClient(context.Background(), info.ip+":"+strconv.Itoa(int(info.port)))
var err error
err = table.clients[nodeID].Init()
err = table.ProxyNodes[nodeID].Init()
if err != nil {
panic(err)
}
err = table.clients[nodeID].Start()
err = table.ProxyNodes[nodeID].Start()
if err != nil {
panic(err)
}
......@@ -116,18 +104,18 @@ func (table *GlobalNodeInfoTable) ReleaseAllClients() error {
}()
var err error
for id, client := range table.clients {
for id, client := range table.ProxyNodes {
err = client.Stop()
if err != nil {
panic(err)
}
delete(table.clients, id)
delete(table.ProxyNodes, id)
}
return nil
}
func (table *GlobalNodeInfoTable) ObtainAllClients() (map[UniqueID]NodeClient, error) {
func (table *GlobalNodeInfoTable) ObtainAllClients() (map[UniqueID]types.ProxyNode, error) {
table.mtx.RLock()
defer table.mtx.RUnlock()
......@@ -136,13 +124,13 @@ func (table *GlobalNodeInfoTable) ObtainAllClients() (map[UniqueID]NodeClient, e
err := table.createClients()
return table.clients, err
return table.ProxyNodes, err
}
func NewGlobalNodeInfoTable() *GlobalNodeInfoTable {
return &GlobalNodeInfoTable{
nodeIDs: make([]UniqueID, 0),
infos: make(map[UniqueID]*NodeInfo),
clients: make(map[UniqueID]NodeClient),
nodeIDs: make([]UniqueID, 0),
infos: make(map[UniqueID]*NodeInfo),
ProxyNodes: make(map[UniqueID]types.ProxyNode),
}
}
......@@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"go.uber.org/zap"
......@@ -31,8 +33,8 @@ type indexLoader struct {
fieldIndexes map[string][]*internalpb2.IndexStats
fieldStatsChan chan []*internalpb2.FieldStats
masterClient MasterServiceInterface
indexClient IndexServiceInterface
masterService types.MasterService
indexService types.IndexService
kv kv.Base // minio kv
}
......@@ -315,7 +317,7 @@ func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueI
CollectionID: collectionID,
SegmentID: segmentID,
}
response, err := loader.masterClient.DescribeSegment(ctx, req)
response, err := loader.masterService.DescribeSegment(ctx, req)
if err != nil {
return 0, 0, err
}
......@@ -327,14 +329,14 @@ func (loader *indexLoader) getIndexInfo(collectionID UniqueID, segmentID UniqueI
func (loader *indexLoader) getIndexPaths(indexBuildID UniqueID) ([]string, error) {
ctx := context.TODO()
if loader.indexClient == nil {
if loader.indexService == nil {
return nil, errors.New("null index service client")
}
indexFilePathRequest := &indexpb.IndexFilePathsRequest{
IndexBuildIDs: []UniqueID{indexBuildID},
}
pathResponse, err := loader.indexClient.GetIndexFilePaths(ctx, indexFilePathRequest)
pathResponse, err := loader.indexService.GetIndexFilePaths(ctx, indexFilePathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, err
}
......@@ -389,7 +391,7 @@ func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, in
return nil
}
func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface) *indexLoader {
func newIndexLoader(ctx context.Context, masterService types.MasterService, indexService types.IndexService, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
......@@ -410,8 +412,8 @@ func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, in
fieldIndexes: make(map[string][]*internalpb2.IndexStats),
fieldStatsChan: make(chan []*internalpb2.FieldStats, 1),
masterClient: masterClient,
indexClient: indexClient,
masterService: masterService,
indexService: indexService,
kv: client,
}
......
......@@ -6,6 +6,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"go.uber.org/zap"
......@@ -161,10 +163,10 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return nil
}
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *loadService {
func newLoadService(ctx context.Context, masterService types.MasterService, dataService types.DataService, indexService types.IndexService, replica ReplicaInterface, dmStream msgstream.MsgStream) *loadService {
ctx1, cancel := context.WithCancel(ctx)
segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream)
segLoader := newSegmentLoader(ctx1, masterService, indexService, dataService, replica, dmStream)
return &loadService{
ctx: ctx1,
......
......@@ -18,6 +18,8 @@ import (
"strings"
"sync/atomic"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"go.uber.org/zap"
......@@ -32,21 +34,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Node interface {
typeutil.Component
AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionRequest) (*commonpb.Status, error)
ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetSegmentInfo(ctx context.Context, in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error)
}
type QueryService = typeutil.QueryServiceInterface
type QueryNode struct {
typeutil.Service
......@@ -66,10 +53,10 @@ type QueryNode struct {
statsService *statsService
// clients
masterClient MasterServiceInterface
queryClient QueryServiceInterface
indexClient IndexServiceInterface
dataClient DataServiceInterface
masterService types.MasterService
queryService types.QueryService
indexService types.IndexService
dataService types.DataService
msFactory msgstream.Factory
}
......@@ -127,7 +114,7 @@ func (node *QueryNode) Init() error {
},
}
resp, err := node.queryClient.RegisterNode(ctx, registerReq)
resp, err := node.queryService.RegisterNode(ctx, registerReq)
if err != nil {
panic(err)
}
......@@ -152,15 +139,15 @@ func (node *QueryNode) Init() error {
log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
if node.masterClient == nil {
if node.masterService == nil {
log.Error("null master service detected")
}
if node.indexClient == nil {
if node.indexService == nil {
log.Error("null index service detected")
}
if node.dataClient == nil {
if node.dataService == nil {
log.Error("null data service detected")
}
......@@ -183,7 +170,7 @@ func (node *QueryNode) Start() error {
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica, node.dataSyncService.dmStream)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
// start services
......@@ -223,35 +210,35 @@ func (node *QueryNode) UpdateStateCode(code internalpb2.StateCode) {
node.stateCode.Store(code)
}
func (node *QueryNode) SetMasterService(master MasterServiceInterface) error {
func (node *QueryNode) SetMasterService(master types.MasterService) error {
if master == nil {
return errors.New("null master service interface")
}
node.masterClient = master
node.masterService = master
return nil
}
func (node *QueryNode) SetQueryService(query QueryServiceInterface) error {
func (node *QueryNode) SetQueryService(query types.QueryService) error {
if query == nil {
return errors.New("null query service interface")
}
node.queryClient = query
node.queryService = query
return nil
}
func (node *QueryNode) SetIndexService(index IndexServiceInterface) error {
func (node *QueryNode) SetIndexService(index types.IndexService) error {
if index == nil {
return errors.New("null index service interface")
}
node.indexClient = index
node.indexService = index
return nil
}
func (node *QueryNode) SetDataService(data DataServiceInterface) error {
func (node *QueryNode) SetDataService(data types.DataService) error {
if data == nil {
return errors.New("null data service interface")
}
node.dataClient = data
node.dataService = data
return nil
}
......
......@@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
......@@ -23,7 +25,9 @@ const debug = false
const defaultPartitionID = UniqueID(2021)
type queryServiceMock struct{}
type queryServiceMock struct {
types.QueryService
}
func setup() {
os.Setenv("QUERY_NODE_ID", "1")
......
......@@ -4,6 +4,8 @@ import (
"context"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/types"
"errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
......@@ -21,7 +23,7 @@ type segmentLoader struct {
dmStream msgstream.MsgStream
dataClient DataServiceInterface
dataService types.DataService
kv kv.Base // minio kv
iCodec *storage.InsertCodec
......@@ -31,7 +33,7 @@ type segmentLoader struct {
func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.StringList, []int64, error) {
ctx := context.TODO()
if loader.dataClient == nil {
if loader.dataService == nil {
return nil, nil, errors.New("null data service client")
}
......@@ -39,7 +41,7 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern
SegmentID: segmentID,
}
pathResponse, err := loader.dataClient.GetInsertBinlogPaths(ctx, insertBinlogPathRequest)
pathResponse, err := loader.dataService.GetInsertBinlogPaths(ctx, insertBinlogPathRequest)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, nil, err
}
......@@ -53,14 +55,14 @@ func (loader *segmentLoader) getInsertBinlogPaths(segmentID UniqueID) ([]*intern
func (loader *segmentLoader) GetSegmentStates(segmentID UniqueID) (*datapb.SegmentStatesResponse, error) {
ctx := context.TODO()
if loader.dataClient == nil {
if loader.dataService == nil {
return nil, errors.New("null data service client")
}
segmentStatesRequest := &datapb.SegmentStatesRequest{
SegmentIDs: []int64{segmentID},
}
statesResponse, err := loader.dataClient.GetSegmentStates(ctx, segmentStatesRequest)
statesResponse, err := loader.dataService.GetSegmentStates(ctx, segmentStatesRequest)
if err != nil || statesResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return nil, err
}
......@@ -191,7 +193,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetField
return nil
}
func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *segmentLoader {
func newSegmentLoader(ctx context.Context, masterService types.MasterService, indexService types.IndexService, dataService types.DataService, replica ReplicaInterface, dmStream msgstream.MsgStream) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
......@@ -206,13 +208,13 @@ func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface,
panic(err)
}
iLoader := newIndexLoader(ctx, masterClient, indexClient, replica)
iLoader := newIndexLoader(ctx, masterService, indexService, replica)
return &segmentLoader{
replica: replica,
dmStream: dmStream,
dataClient: dataClient,
dataService: dataService,
kv: client,
iCodec: &storage.InsertCodec{},
......
package querynode
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
......@@ -22,20 +16,3 @@ type TimeRange struct {
timestampMin Timestamp
timestampMax Timestamp
}
type MasterServiceInterface interface {
DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
}
type QueryServiceInterface interface {
RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
}
type DataServiceInterface interface {
GetInsertBinlogPaths(ctx context.Context, req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
GetSegmentStates(ctx context.Context, req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
}
type IndexServiceInterface interface {
GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error)
}
......@@ -176,6 +176,7 @@ type QueryService interface {
Component
TimeTickProvider
RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error)
LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
......
......@@ -4,7 +4,6 @@ import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
......@@ -24,24 +23,6 @@ type Component interface {
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
}
// TODO
type IndexNodeInterface interface {
Service
Component
BuildIndex(ctx context.Context, req *indexpb.BuildIndexCmd) (*commonpb.Status, error)
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
}
type IndexServiceInterface interface {
Service
Component
RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error)
BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error)
GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error)
NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error)
}
type QueryServiceInterface interface {
Service
Component
......
......@@ -58,6 +58,7 @@ class TestGetCollectionStats:
def insert_count(self, request):
yield request.param
@pytest.mark.tags("0331")
def test_get_collection_stats_name_not_existed(self, connect, collection):
'''
target: get collection stats where collection name does not exist
......@@ -69,6 +70,7 @@ class TestGetCollectionStats:
connect.get_collection_stats(collection_name)
@pytest.mark.level(2)
@pytest.mark.tags("0331")
def test_get_collection_stats_name_invalid(self, connect, get_invalid_collection_name):
'''
target: get collection stats where collection name is invalid
......@@ -79,6 +81,7 @@ class TestGetCollectionStats:
with pytest.raises(Exception) as e:
connect.get_collection_stats(collection_name)
@pytest.mark.tags("0331")
def test_get_collection_stats_empty(self, connect, collection):
'''
target: get collection stats where no entity in collection
......@@ -89,6 +92,7 @@ class TestGetCollectionStats:
connect.flush([collection])
assert stats[row_count] == 0
@pytest.mark.tags("0331")
def test_get_collection_stats_without_connection(self, collection, dis_connect):
'''
target: test count_entities, without connection
......@@ -98,6 +102,7 @@ class TestGetCollectionStats:
with pytest.raises(Exception) as e:
dis_connect.get_collection_stats(collection)
@pytest.mark.tags("0331")
def test_get_collection_stats_batch(self, connect, collection):
'''
target: get row count with collection_stats
......@@ -110,6 +115,8 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert int(stats[row_count]) == default_nb
# @pytest.mark.tags("0331")
# TODO ci failed
def test_get_collection_stats_single(self, connect, collection):
'''
target: get row count with collection_stats
......@@ -184,6 +191,7 @@ class TestGetCollectionStats:
# pdb.set_trace()
assert compact_before == compact_after
@pytest.mark.tags("0331")
def test_get_collection_stats_partition(self, connect, collection):
'''
target: get partition info in a collection
......@@ -197,10 +205,11 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == default_nb
@pytest.mark.tags("0331")
def test_get_collection_stats_partitions(self, connect, collection):
'''
target: get partition info in a collection
method: create two partitions, add vectors in one of the partitions, call collection_stats and check
method: create two partitions, add vectors in one of the partitions, call collection_stats and check
expected: status ok, vectors added to one partition but not the other
'''
new_tag = "new_tag"
......@@ -219,7 +228,7 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == default_nb * 3
# @pytest.mark.tags("0331")
@pytest.mark.tags("0331")
def test_get_collection_stats_partitions_A(self, connect, collection, insert_count):
'''
target: test collection rows_count is correct or not
......@@ -236,7 +245,7 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == insert_count
# @pytest.mark.tags("0331")
@pytest.mark.tags("0331")
def test_get_collection_stats_partitions_B(self, connect, collection, insert_count):
'''
target: test collection rows_count is correct or not
......@@ -253,7 +262,7 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == insert_count
# @pytest.mark.tags("0331")
@pytest.mark.tags("0331")
def test_get_collection_stats_partitions_C(self, connect, collection, insert_count):
'''
target: test collection rows_count is correct or not
......@@ -271,7 +280,7 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == insert_count*2
# @pytest.mark.tags("0331")
@pytest.mark.tags("0331")
def test_get_collection_stats_partitions_D(self, connect, collection, insert_count):
'''
target: test collection rows_count is correct or not
......@@ -290,10 +299,11 @@ class TestGetCollectionStats:
assert stats[row_count] == insert_count*2
# TODO: assert metric type in stats response
@pytest.mark.tags("0331")
def test_get_collection_stats_after_index_created(self, connect, collection, get_simple_index):
'''
target: test collection info after index created
method: create collection, add vectors, create index and call collection_stats
method: create collection, add vectors, create index and call collection_stats
expected: status ok, index created and shown in segments
'''
connect.insert(collection, default_entities)
......@@ -303,10 +313,11 @@ class TestGetCollectionStats:
assert stats[row_count] == default_nb
# TODO: assert metric type in stats response
@pytest.mark.tags("0331")
def test_get_collection_stats_after_index_created_ip(self, connect, collection, get_simple_index):
'''
target: test collection info after index created
method: create collection, add vectors, create index and call collection_stats
method: create collection, add vectors, create index and call collection_stats
expected: status ok, index created and shown in segments
'''
get_simple_index["metric_type"] = "IP"
......@@ -319,10 +330,11 @@ class TestGetCollectionStats:
assert stats[row_count] == default_nb
# TODO: assert metric type in stats response
@pytest.mark.tags("0331")
def test_get_collection_stats_after_index_created_jac(self, connect, binary_collection, get_jaccard_index):
'''
target: test collection info after index created
method: create collection, add binary entities, create index and call collection_stats
method: create collection, add binary entities, create index and call collection_stats
expected: status ok, index created and shown in segments
'''
ids = connect.insert(binary_collection, default_binary_entities)
......@@ -331,10 +343,11 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(binary_collection)
assert stats[row_count] == default_nb
@pytest.mark.tags("0331")
def test_get_collection_stats_after_create_different_index(self, connect, collection):
'''
target: test collection info after index created repeatedly
method: create collection, add vectors, create index and call collection_stats multiple times
method: create collection, add vectors, create index and call collection_stats multiple times
expected: status ok, index info shown in segments
'''
ids = connect.insert(collection, default_entities)
......@@ -345,6 +358,7 @@ class TestGetCollectionStats:
stats = connect.get_collection_stats(collection)
assert stats[row_count] == default_nb
@pytest.mark.tags("0331")
def test_collection_count_multi_collections(self, connect):
'''
target: test collection rows_count is correct or not with multiple collections of L2
......@@ -366,6 +380,7 @@ class TestGetCollectionStats:
connect.drop_collection(collection_list[i])
@pytest.mark.level(2)
@pytest.mark.tags("0331")
def test_collection_count_multi_collections_indexed(self, connect):
'''
target: test collection rows_count is correct or not with multiple collections of L2
......
......@@ -871,7 +871,7 @@ def get_search_param(index_type, metric_type="L2"):
search_params = {"metric_type": metric_type}
if index_type in ivf() or index_type in binary_support():
search_params.update({"nprobe": 64})
elif index_type in ["HNSW", "RHNSW_SQ", "RHNSW_PQ"]:
elif index_type in ["HNSW", "RHNSW_FLAT","RHNSW_SQ", "RHNSW_PQ"]:
search_params.update({"ef": 64})
elif index_type == "NSG":
search_params.update({"search_length": 100})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册