diff --git a/.jenkins/modules/Regression/PythonRegression.groovy b/.jenkins/modules/Regression/PythonRegression.groovy index 2ccb9ed528569427938e6fa9aa1922fe4fa78f27..0d680d469ce9ef71c8f1fadf95f3fbf26c9d473a 100644 --- a/.jenkins/modules/Regression/PythonRegression.groovy +++ b/.jenkins/modules/Regression/PythonRegression.groovy @@ -6,6 +6,7 @@ timeout(time: 60, unit: 'MINUTES') { sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d etcd' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d minio' dir ('build/docker/deploy') { + sh 'docker pull ${TARGET_REPO}/milvus-distributed:${TARGET_TAG}' if ("${REGRESSION_SERVICE_NAME}" == "regression_distributed") { sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d master' sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d indexservice' diff --git a/internal/datanode/allocator.go b/internal/datanode/allocator.go index bf4dfe3f5b1e346fccede866ca75ff79d1d2b9ea..80ebf08f2fa379763ed432bff6b5bc765527ef39 100644 --- a/internal/datanode/allocator.go +++ b/internal/datanode/allocator.go @@ -8,22 +8,22 @@ import ( ) type ( - allocator interface { + allocatorInterface interface { allocID() (UniqueID, error) } - allocatorImpl struct { + allocator struct { masterService MasterServiceInterface } ) -func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl { - return &allocatorImpl{ +func newAllocator(s MasterServiceInterface) *allocator { + return &allocator{ masterService: s, } } -func (alloc *allocatorImpl) allocID() (UniqueID, error) { +func (alloc *allocator) allocID() (UniqueID, error) { ctx := context.TODO() resp, err := alloc.masterService.AllocID(ctx, &masterpb.IDRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index a83370be924c44a36f6ebf3c270a409c6cc6daa8..1e34623d58bf83f7738737b5be4c22cf834ca3fa 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -29,32 +29,30 @@ type Replica interface { getSegmentByID(segmentID UniqueID) (*Segment, error) } -type ( - Segment struct { - segmentID UniqueID - collectionID UniqueID - partitionID UniqueID - numRows int64 - memorySize int64 - isNew bool - createTime Timestamp // not using - endTime Timestamp // not using - startPosition *internalpb2.MsgPosition - endPosition *internalpb2.MsgPosition // not using - } +type Segment struct { + segmentID UniqueID + collectionID UniqueID + partitionID UniqueID + numRows int64 + memorySize int64 + isNew bool + createTime Timestamp // not using + endTime Timestamp // not using + startPosition *internalpb2.MsgPosition + endPosition *internalpb2.MsgPosition // not using +} - ReplicaImpl struct { - mu sync.RWMutex - segments []*Segment - collections map[UniqueID]*Collection - } -) +type CollectionSegmentReplica struct { + mu sync.RWMutex + segments []*Segment + collections map[UniqueID]*Collection +} func newReplica() Replica { segments := make([]*Segment, 0) collections := make(map[UniqueID]*Collection) - var replica Replica = &ReplicaImpl{ + var replica Replica = &CollectionSegmentReplica{ segments: segments, collections: collections, } @@ -62,7 +60,7 @@ func newReplica() Replica { } // --- segment --- -func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { +func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { replica.mu.RLock() defer replica.mu.RUnlock() @@ -74,7 +72,7 @@ func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID) } -func (replica *ReplicaImpl) addSegment( +func (replica *CollectionSegmentReplica) addSegment( segmentID UniqueID, collID UniqueID, partitionID UniqueID, @@ -101,7 +99,7 @@ func (replica *ReplicaImpl) addSegment( return nil } -func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error { +func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error { replica.mu.Lock() defer replica.mu.Unlock() @@ -117,7 +115,7 @@ func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error { return fmt.Errorf("Error, there's no segment %v", segmentID) } -func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool { +func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool { replica.mu.RLock() defer replica.mu.RUnlock() @@ -129,7 +127,7 @@ func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool { return false } -func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) error { +func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error { replica.mu.Lock() defer replica.mu.Unlock() @@ -144,7 +142,7 @@ func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) return fmt.Errorf("Error, there's no segment %v", segmentID) } -func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) { +func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) { replica.mu.Lock() defer replica.mu.Unlock() @@ -169,14 +167,14 @@ func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*in } // --- collection --- -func (replica *ReplicaImpl) getCollectionNum() int { +func (replica *CollectionSegmentReplica) getCollectionNum() int { replica.mu.RLock() defer replica.mu.RUnlock() return len(replica.collections) } -func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { +func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { replica.mu.Lock() defer replica.mu.Unlock() @@ -195,7 +193,7 @@ func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemap return nil } -func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error { +func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error { replica.mu.Lock() defer replica.mu.Unlock() @@ -204,7 +202,7 @@ func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error { return nil } -func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { +func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) { replica.mu.RLock() defer replica.mu.RUnlock() @@ -216,7 +214,7 @@ func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collectio return coll, nil } -func (replica *ReplicaImpl) hasCollection(collectionID UniqueID) bool { +func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool { replica.mu.RLock() defer replica.mu.RUnlock() diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 1c14c7b54857d421e62c275fcde2d741ce6b8069..0f52d3282f98d6585cfa9cc7c4f833e37e23b433 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -16,10 +16,10 @@ func TestReplica_Collection(t *testing.T) { replica := newReplica() assert.Zero(t, replica.getCollectionNum()) - replica = new(ReplicaImpl) + replica = new(CollectionSegmentReplica) assert.Zero(t, replica.getCollectionNum()) - replica = &ReplicaImpl{ + replica = &CollectionSegmentReplica{ collections: map[UniqueID]*Collection{ 0: {id: 0}, 1: {id: 1}, diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index b61e0c98e21dcf7d1ae009fb784f70baf8c3f906..99336c912c9fd478b0d5a74e510ad15c26d31683 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -157,7 +157,7 @@ func (node *DataNode) Init() error { replica := newReplica() - var alloc allocator = newAllocatorImpl(node.masterService) + var alloc allocatorInterface = newAllocator(node.masterService) chanSize := 100 node.flushChan = make(chan *flushMsg, chanSize) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 71276889aa6417e2f2709522a02b91609c0ff103..b35383cca30a085453951e4a72c6104b3a276866 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -19,12 +19,12 @@ type dataSyncService struct { fg *flowgraph.TimeTickedFlowGraph flushChan chan *flushMsg replica Replica - idAllocator allocator + idAllocator allocatorInterface msFactory msgstream.Factory } func newDataSyncService(ctx context.Context, flushChan chan *flushMsg, - replica Replica, alloc allocator, factory msgstream.Factory) *dataSyncService { + replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService { service := &dataSyncService{ ctx: ctx, fg: nil, diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 1295004f56c9122d38497eaed051590a203512f4..48b30481d59ac877153a2fc3b1e783fbaf6beecf 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -26,7 +26,7 @@ type ddNode struct { ddBuffer *ddBuffer inFlushCh chan *flushMsg - idAllocator allocator + idAllocator allocatorInterface kv kv.Base replica Replica flushMeta *metaTable @@ -369,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { } func newDDNode(ctx context.Context, flushMeta *metaTable, - inFlushCh chan *flushMsg, replica Replica, alloc allocator) *ddNode { + inFlushCh chan *flushMsg, replica Replica, alloc allocatorInterface) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index 9d1e66af3836ff1500ad37497d5276017c71a223..686151ba35df525c0b14c0495e61e3e273539c78 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -41,7 +41,7 @@ type ( minIOKV kv.Base minioPrefix string - idAllocator allocator + idAllocator allocatorInterface timeTickStream msgstream.MsgStream segmentStatisticsStream msgstream.MsgStream @@ -622,7 +622,7 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) ( } func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, - replica Replica, alloc allocator, factory msgstream.Factory) *insertBufferNode { + replica Replica, alloc allocatorInterface, factory msgstream.Factory) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/dataservice/allocator.go b/internal/dataservice/allocator.go index f095fdabaa4c345f21a72a86ee74116f850a2f39..0a7cf1a7c99936b9d614b382cd22f3dcb0084ea0 100644 --- a/internal/dataservice/allocator.go +++ b/internal/dataservice/allocator.go @@ -7,22 +7,22 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" ) -type allocator interface { +type allocatorInterface interface { allocTimestamp() (Timestamp, error) allocID() (UniqueID, error) } -type allocatorImpl struct { +type allocator struct { masterClient MasterClient } -func newAllocatorImpl(masterClient MasterClient) *allocatorImpl { - return &allocatorImpl{ +func newAllocator(masterClient MasterClient) *allocator { + return &allocator{ masterClient: masterClient, } } -func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { +func (allocator *allocator) allocTimestamp() (Timestamp, error) { ctx := context.TODO() resp, err := allocator.masterClient.AllocTimestamp(ctx, &masterpb.TsoRequest{ Base: &commonpb.MsgBase{ @@ -39,7 +39,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { return resp.Timestamp, nil } -func (allocator *allocatorImpl) allocID() (UniqueID, error) { +func (allocator *allocator) allocID() (UniqueID, error) { ctx := context.TODO() resp, err := allocator.masterClient.AllocID(ctx, &masterpb.IDRequest{ Base: &commonpb.MsgBase{ diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go index b15602e3eaca4ef97ff3f8b99b170939b5298026..9425224d891f43ebc623f4d427edfe53953fcc16 100644 --- a/internal/dataservice/dd_handler.go +++ b/internal/dataservice/dd_handler.go @@ -10,10 +10,10 @@ import ( type ddHandler struct { meta *meta - segmentAllocator segmentAllocator + segmentAllocator segmentAllocatorInterface } -func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler { +func newDDHandler(meta *meta, allocator segmentAllocatorInterface) *ddHandler { return &ddHandler{ meta: meta, segmentAllocator: allocator, diff --git a/internal/dataservice/mock.go b/internal/dataservice/mock.go index 34a6040640031c150d48a632d79e5aff8f136912..a701083caff2776ec62ef11f4a60f098266710b3 100644 --- a/internal/dataservice/mock.go +++ b/internal/dataservice/mock.go @@ -15,7 +15,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) -func newMemoryMeta(allocator allocator) (*meta, error) { +func newMemoryMeta(allocator allocatorInterface) (*meta, error) { memoryKV := memkv.NewMemoryKV() return newMeta(memoryKV) } diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index a85dd4111eba8314a23d4e28f7a36b986c6f4141..7edcc3eaa20f1f5665494d0425057560c16fc3a5 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -26,7 +26,7 @@ func (err errRemainInSufficient) Error() string { } // segmentAllocator is used to allocate rows for segments and record the allocations. -type segmentAllocator interface { +type segmentAllocatorInterface interface { // OpenSegment add the segment to allocator and set it allocatable OpenSegment(segmentInfo *datapb.SegmentInfo) error // AllocSegment allocate rows and record the allocation. @@ -45,34 +45,32 @@ type segmentAllocator interface { IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) } -type ( - segmentStatus struct { - id UniqueID - collectionID UniqueID - partitionID UniqueID - total int - sealed bool - lastExpireTime Timestamp - allocations []*allocation - insertChannel string - } - allocation struct { - rowNums int - expireTime Timestamp - } - segmentAllocatorImpl struct { - mt *meta - segments map[UniqueID]*segmentStatus //segment id -> status - segmentExpireDuration int64 - segmentThreshold float64 - segmentThresholdFactor float64 - mu sync.RWMutex - allocator allocator - } -) +type segmentStatus struct { + id UniqueID + collectionID UniqueID + partitionID UniqueID + total int + sealed bool + lastExpireTime Timestamp + allocations []*allocation + insertChannel string +} +type allocation struct { + rowNums int + expireTime Timestamp +} +type segmentAllocator struct { + mt *meta + segments map[UniqueID]*segmentStatus //segment id -> status + segmentExpireDuration int64 + segmentThreshold float64 + segmentThresholdFactor float64 + mu sync.RWMutex + allocator allocatorInterface +} -func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl { - segmentAllocator := &segmentAllocatorImpl{ +func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAllocator { + segmentAllocator := &segmentAllocator{ mt: meta, segments: make(map[UniqueID]*segmentStatus), segmentExpireDuration: Params.SegIDAssignExpiration, @@ -83,7 +81,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl return segmentAllocator } -func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error { +func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo) error { allocator.mu.Lock() defer allocator.mu.Unlock() if _, ok := allocator.segments[segmentInfo.SegmentID]; ok { @@ -105,7 +103,7 @@ func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentIn return nil } -func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID, +func (allocator *segmentAllocator) AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) { allocator.mu.Lock() defer allocator.mu.Unlock() @@ -133,7 +131,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID, return } -func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows int) (bool, error) { +func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) { totalOfAllocations := 0 for _, allocation := range segStatus.allocations { totalOfAllocations += allocation.rowNums @@ -163,7 +161,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i return true, nil } -func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID) (int, error) { +func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { collMeta, err := allocator.mt.GetCollection(collectionID) if err != nil { return -1, err @@ -175,7 +173,7 @@ func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID) return int(allocator.segmentThreshold / float64(sizePerRecord)), nil } -func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) { +func (allocator *segmentAllocator) GetSealedSegments() ([]UniqueID, error) { allocator.mu.Lock() defer allocator.mu.Unlock() keys := make([]UniqueID, 0) @@ -194,7 +192,7 @@ func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) { return keys, nil } -func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { +func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { segMeta, err := allocator.mt.GetSegment(segStatus.id) if err != nil { return false, err @@ -202,7 +200,7 @@ func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStat return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil } -func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error { +func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error { allocator.mu.Lock() defer allocator.mu.Unlock() status, ok := allocator.segments[segmentID] @@ -213,13 +211,13 @@ func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error { return nil } -func (allocator *segmentAllocatorImpl) DropSegment(segmentID UniqueID) { +func (allocator *segmentAllocator) DropSegment(segmentID UniqueID) { allocator.mu.Lock() defer allocator.mu.Unlock() delete(allocator.segments, segmentID) } -func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) error { +func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error { allocator.mu.Lock() defer allocator.mu.Unlock() for _, segStatus := range allocator.segments { @@ -234,7 +232,7 @@ func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) err return nil } -func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) { +func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) { allocator.mu.RLock() defer allocator.mu.RUnlock() status, ok := allocator.segments[segmentID] @@ -244,7 +242,7 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, return status.lastExpireTime <= ts, nil } -func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) { +func (allocator *segmentAllocator) SealAllSegments(collectionID UniqueID) { allocator.mu.Lock() defer allocator.mu.Unlock() for _, status := range allocator.segments { diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 810a32f5c386976a75f78eb00fcef2ba63275a3a..c4d389ef08d02b5066ba0b4a7a07b73e4cb35de3 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -32,8 +32,8 @@ import ( const role = "dataservice" type DataService interface { - typeutil.Service typeutil.Component + RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) @@ -77,10 +77,10 @@ type ( state atomic.Value client *etcdkv.EtcdKV meta *meta - segAllocator segmentAllocator + segAllocator segmentAllocatorInterface statsHandler *statsHandler ddHandler *ddHandler - allocator allocator + allocator allocatorInterface cluster *dataNodeCluster msgProducer *timesync.MsgProducer registerFinishCh chan struct{} @@ -136,7 +136,7 @@ func (s *Server) Start() error { return err } - s.allocator = newAllocatorImpl(s.masterClient) + s.allocator = newAllocator(s.masterClient) if err = s.initMeta(); err != nil { return err } diff --git a/internal/dataservice/watcher.go b/internal/dataservice/watcher.go index 9a3a33450b00af27fcc7d48e19925662af329f83..00016d013cc5244e177045027fcf9773eba99d24 100644 --- a/internal/dataservice/watcher.go +++ b/internal/dataservice/watcher.go @@ -13,18 +13,18 @@ import ( type ( proxyTimeTickWatcher struct { - allocator segmentAllocator + allocator segmentAllocatorInterface msgQueue chan *msgstream.TimeTickMsg } dataNodeTimeTickWatcher struct { meta *meta cluster *dataNodeCluster - allocator segmentAllocator + allocator segmentAllocatorInterface msgQueue chan *msgstream.TimeTickMsg } ) -func newProxyTimeTickWatcher(allocator segmentAllocator) *proxyTimeTickWatcher { +func newProxyTimeTickWatcher(allocator segmentAllocatorInterface) *proxyTimeTickWatcher { return &proxyTimeTickWatcher{ allocator: allocator, msgQueue: make(chan *msgstream.TimeTickMsg, 1), @@ -49,7 +49,7 @@ func (watcher *proxyTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) { watcher.msgQueue <- msg } -func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocator, cluster *dataNodeCluster) *dataNodeTimeTickWatcher { +func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocatorInterface, cluster *dataNodeCluster) *dataNodeTimeTickWatcher { return &dataNodeTimeTickWatcher{ meta: meta, allocator: allocator, diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 8ca9e909fd3c006fef36c92a0993479f26b05767..a1af8937751b56595484124bbf8d36d55427d162 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -20,7 +20,7 @@ import ( ) type Server struct { - impl *indexnode.NodeImpl + impl *indexnode.IndexNode grpcServer *grpc.Server grpcErrChan chan error @@ -164,7 +164,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty func NewServer(ctx context.Context) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) - node, err := indexnode.NewNodeImpl(ctx1) + node, err := indexnode.NewIndexNode(ctx1) if err != nil { defer cancel() return nil, err diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 8432dcafb10bcb7783129ebd46a0beb9437f3833..9233632d743bb876f034ab8e69610bd0c560391f 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -26,7 +26,7 @@ type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp type Server struct { - impl *indexservice.ServiceImpl + impl *indexservice.IndexService grpcServer *grpc.Server grpcErrChan chan error @@ -161,7 +161,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty func NewServer(ctx context.Context) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) - serverImp, err := indexservice.NewServiceImpl(ctx) + serverImp, err := indexservice.NewIndexService(ctx) if err != nil { defer cancel() return nil, err diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index ca1718e8bc1dbb1b2448a60895a4138eaf3dc0b3..aa5d5f91ced62ea66f5de9f30f341eebce335011 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -37,7 +37,7 @@ const ( type Server struct { ctx context.Context wg sync.WaitGroup - impl *proxynode.NodeImpl + impl *proxynode.ProxyNode grpcServer *grpc.Server grpcErrChan chan error @@ -60,7 +60,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) grpcErrChan: make(chan error), } - server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory) + server.impl, err = proxynode.NewProxyNode(server.ctx, factory) if err != nil { return nil, err } diff --git a/internal/distributed/proxyservice/service.go b/internal/distributed/proxyservice/service.go index fc20da4054f36022a5326521e2886599a414ea24..201f40070a824b777b4745d15fe39be6e28078c5 100644 --- a/internal/distributed/proxyservice/service.go +++ b/internal/distributed/proxyservice/service.go @@ -30,7 +30,7 @@ type Server struct { grpcServer *grpc.Server grpcErrChan chan error - impl *proxyservice.ServiceImpl + impl *proxyservice.ProxyService tracer opentracing.Tracer closer io.Closer @@ -60,7 +60,7 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error) } opentracing.SetGlobalTracer(server.tracer) - server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory) + server.impl, err = proxyservice.NewProxyService(server.ctx, factory) if err != nil { return nil, err } @@ -131,7 +131,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { } func (s *Server) start() error { - log.Println("proxy ServiceImpl start ...") + log.Println("proxy ProxyService start ...") if err := s.impl.Start(); err != nil { return err } diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 44e2aad02304c77f8b4f7c9e49c63843c6df15dd..d3f06038a3ab0ab5b8b7f431be26c9656d5f7d92 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -29,7 +29,7 @@ const ( type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp -type NodeImpl struct { +type IndexNode struct { stateCode internalpb2.StateCode loopCtx context.Context @@ -48,9 +48,9 @@ type NodeImpl struct { closer io.Closer } -func NewNodeImpl(ctx context.Context) (*NodeImpl, error) { +func NewIndexNode(ctx context.Context) (*IndexNode, error) { ctx1, cancel := context.WithCancel(ctx) - b := &NodeImpl{ + b := &IndexNode{ loopCtx: ctx1, loopCancel: cancel, } @@ -62,7 +62,7 @@ func NewNodeImpl(ctx context.Context) (*NodeImpl, error) { return b, nil } -func (i *NodeImpl) Init() error { +func (i *IndexNode) Init() error { ctx := context.Background() err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 10, time.Second) @@ -125,7 +125,7 @@ func (i *NodeImpl) Init() error { return nil } -func (i *NodeImpl) Start() error { +func (i *IndexNode) Start() error { i.sched.Start() // Start callbacks @@ -136,7 +136,7 @@ func (i *NodeImpl) Start() error { } // Close closes the server. -func (i *NodeImpl) Stop() error { +func (i *IndexNode) Stop() error { if err := i.closer.Close(); err != nil { return err } @@ -151,15 +151,15 @@ func (i *NodeImpl) Stop() error { return nil } -func (i *NodeImpl) UpdateStateCode(code internalpb2.StateCode) { +func (i *IndexNode) UpdateStateCode(code internalpb2.StateCode) { i.stateCode = code } -func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) { +func (i *IndexNode) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) { i.serviceClient = serviceClient } -func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) { +func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) { t := &IndexBuildTask{ BaseTask: BaseTask{ ctx: ctx, @@ -185,7 +185,7 @@ func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCm return ret, nil } -func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) { +func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) { i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -194,16 +194,16 @@ func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequ } // AddStartCallback adds a callback in the startServer phase. -func (i *NodeImpl) AddStartCallback(callbacks ...func()) { +func (i *IndexNode) AddStartCallback(callbacks ...func()) { i.startCallbacks = append(i.startCallbacks, callbacks...) } // AddCloseCallback adds a callback in the Close phase. -func (i *NodeImpl) AddCloseCallback(callbacks ...func()) { +func (i *IndexNode) AddCloseCallback(callbacks ...func()) { i.closeCallbacks = append(i.closeCallbacks, callbacks...) } -func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { +func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { stateInfo := &internalpb2.ComponentInfo{ NodeID: Params.NodeID, @@ -221,7 +221,7 @@ func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.Compone return ret, nil } -func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -229,7 +229,7 @@ func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResp }, nil } -func (i *NodeImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index f61cf4b34165852717f52435473b7c136152aa80..e6cd088f455b201507335f36828ea3a73c7be79b 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -30,7 +30,7 @@ const ( reqTimeoutInterval = time.Second * 10 ) -type ServiceImpl struct { +type IndexService struct { nodeClients *PriorityQueue nodeStates map[UniqueID]*internalpb2.ComponentStates stateCode internalpb2.StateCode @@ -59,9 +59,9 @@ type ServiceImpl struct { type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp -func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) { +func NewIndexService(ctx context.Context) (*IndexService, error) { ctx1, cancel := context.WithCancel(ctx) - i := &ServiceImpl{ + i := &IndexService{ loopCtx: ctx1, loopCancel: cancel, nodeClients: &PriorityQueue{}, @@ -70,7 +70,7 @@ func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) { return i, nil } -func (i *ServiceImpl) Init() error { +func (i *IndexService) Init() error { etcdAddress := Params.EtcdAddress log.Println("etcd address = ", etcdAddress) connectEtcdFn := func() error { @@ -125,7 +125,7 @@ func (i *ServiceImpl) Init() error { return nil } -func (i *ServiceImpl) Start() error { +func (i *IndexService) Start() error { i.loopWg.Add(1) go i.tsLoop() @@ -134,12 +134,12 @@ func (i *ServiceImpl) Start() error { for _, cb := range i.startCallbacks { cb() } - log.Print("ServiceImpl start") + log.Print("IndexService start") return nil } -func (i *ServiceImpl) Stop() error { +func (i *IndexService) Stop() error { i.loopCancel() i.sched.Close() for _, cb := range i.closeCallbacks { @@ -148,15 +148,15 @@ func (i *ServiceImpl) Stop() error { return nil } -func (i *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) { +func (i *IndexService) UpdateStateCode(code internalpb2.StateCode) { i.stateCode = code } -func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { +func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { stateInfo := &internalpb2.ComponentInfo{ NodeID: i.ID, - Role: "ServiceImpl", + Role: "IndexService", StateCode: i.stateCode, } @@ -170,7 +170,7 @@ func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp return ret, nil } -func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -180,7 +180,7 @@ func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR }, nil } -func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -190,7 +190,7 @@ func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin }, nil } -func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { +func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths) ret := &indexpb.BuildIndexResponse{ Status: &commonpb.Status{ @@ -245,7 +245,7 @@ func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexReq return ret, nil } -func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) { +func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) { var indexStates []*indexpb.IndexInfo for _, indexID := range req.IndexBuildIDs { indexState, err := i.metaTable.GetIndexState(indexID) @@ -263,7 +263,7 @@ func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStat return ret, nil } -func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { +func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { i.sched.IndexAddQueue.tryToRemoveUselessIndexAddTask(req.IndexID) err := i.metaTable.MarkIndexAsDeleted(req.IndexID) @@ -292,7 +292,7 @@ func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexReque }, nil } -func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { +func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) { var indexPaths []*indexpb.IndexFilePathInfo = nil for _, indexID := range req.IndexBuildIDs { @@ -312,7 +312,7 @@ func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexF return ret, nil } -func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) { +func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) { ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, } @@ -327,7 +327,7 @@ func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIn return ret, nil } -func (i *ServiceImpl) tsLoop() { +func (i *IndexService) tsLoop() { tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() ctx, cancel := context.WithCancel(i.loopCtx) diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go index ea6dd78b451134d16fcbf08dc19f2bb446cacf71..bc036c0b4541ce15489daf9a8484683fb22ed684 100644 --- a/internal/indexservice/node_mgr.go +++ b/internal/indexservice/node_mgr.go @@ -12,13 +12,13 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -func (i *ServiceImpl) removeNode(nodeID UniqueID) { +func (i *IndexService) removeNode(nodeID UniqueID) { i.nodeLock.Lock() defer i.nodeLock.Unlock() i.nodeClients.Remove(nodeID) } -func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error { +func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error { i.nodeLock.Lock() defer i.nodeLock.Unlock() @@ -46,7 +46,7 @@ func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) return nil } -func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair { +func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair { var params []*commonpb.KeyValuePair params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress}) params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID}) @@ -56,7 +56,7 @@ func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair { return params } -func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { +func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) { ret := &indexpb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -65,7 +65,7 @@ func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNod nodeID, err := i.idAllocator.AllocOne() if err != nil { - ret.Status.Reason = "ServiceImpl:RegisterNode Failed to acquire NodeID" + ret.Status.Reason = "IndexService:RegisterNode Failed to acquire NodeID" return ret, nil } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 2ef51384e11d67567cbbfc15bd0bb96bc302bcc6..d8fe513271e57a4f36506b2b772e22d08cef2a52 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -60,10 +60,7 @@ type QueryServiceInterface interface { type Interface interface { //service - Init() error - Start() error - Stop() error - GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) + typeutil.Component //DDL request CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) diff --git a/internal/masterservice/meta_table_test.go b/internal/masterservice/meta_table_test.go index a45b7fbb115a4d71b44df9c02f40e5e274924f58..bcd59c9a525deb3b21e6e8eaafebcae8f6306967 100644 --- a/internal/masterservice/meta_table_test.go +++ b/internal/masterservice/meta_table_test.go @@ -164,7 +164,8 @@ func TestMetaTable(t *testing.T) { IndexParams: params, } - _, field, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo) + ids, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo) + assert.Nil(t, ids) assert.NotNil(t, err) seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo) assert.Nil(t, err) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 1c88955e11683caa9a8368328289b07d944ece04..faaca797e3e3611544db37f4a0b1c42b5ee198a4 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -21,11 +21,11 @@ const ( reqTimeoutInterval = time.Second * 10 ) -func (node *NodeImpl) UpdateStateCode(code internalpb2.StateCode) { +func (node *ProxyNode) UpdateStateCode(code internalpb2.StateCode) { node.stateCode.Store(code) } -func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { +func (node *ProxyNode) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { collectionName := request.CollectionName globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached return &commonpb.Status{ @@ -34,7 +34,7 @@ func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request }, nil } -func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { log.Println("create collection: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -66,7 +66,7 @@ func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.Cr return cct.result, nil } -func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { log.Println("drop collection: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -97,7 +97,7 @@ func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.Drop return dct.result, nil } -func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { +func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { log.Println("has collection: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -132,7 +132,7 @@ func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCo return hct.result, nil } -func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) { log.Println("load collection: ", request) //ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) //defer cancel() @@ -163,7 +163,7 @@ func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.Load return lct.result, nil } -func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) { log.Println("release collection: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -194,7 +194,7 @@ func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.R return rct.result, nil } -func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { +func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { log.Println("describe collection: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -229,7 +229,7 @@ func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb. return dct.result, nil } -func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) { +func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) { log.Println("get collection statistics") ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -263,7 +263,7 @@ func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milv return g.result, nil } -func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { +func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) { log.Println("show collections") ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -297,7 +297,7 @@ func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.Sho return sct.result, nil } -func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { log.Println("create partition", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -326,7 +326,7 @@ func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.Cre return cpt.result, nil } -func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { log.Println("drop partition: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -356,7 +356,7 @@ func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropP return dpt.result, nil } -func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { +func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { log.Println("has partition: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -392,7 +392,7 @@ func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPar return hpt.result, nil } -func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) { +func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) { log.Println("load partitions: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -423,7 +423,7 @@ func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.Load return lpt.result, nil } -func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) { +func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) { log.Println("load partitions: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -454,11 +454,11 @@ func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.R return rpt.result, nil } -func (node *NodeImpl) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) { +func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) { panic("implement me") } -func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { +func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { log.Println("show partitions: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -493,7 +493,7 @@ func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.Show return spt.result, nil } -func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { +func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { log.Println("create index for: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -523,7 +523,7 @@ func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateI return cit.result, nil } -func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { +func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { log.Println("Describe index for: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -557,7 +557,7 @@ func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.Descr return dit.result, nil } -func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { +func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) { log.Println("Drop index for: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -584,7 +584,7 @@ func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndex return dit.result, nil } -func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) { +func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) { // log.Println("Describe index progress for: ", request) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -619,7 +619,7 @@ func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.Index return dipt.result, nil } -func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) { +func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) { ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -671,7 +671,7 @@ func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertReques return it.result, nil } -func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { +func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -713,7 +713,7 @@ func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchReques return qt.result, nil } -func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) { +func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) { log.Println("AA Flush collections: ", request.CollectionNames) ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -743,11 +743,11 @@ func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest) return ft.result, nil } -func (node *NodeImpl) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) { +func (node *ProxyNode) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) { panic("implement me") } -func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) { +func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) { resp := &milvuspb.PersistentSegmentInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -794,7 +794,7 @@ func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvusp return resp, nil } -func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) { +func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) { resp := &milvuspb.QuerySegmentInfoResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -839,7 +839,7 @@ func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Que return resp, nil } -func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) { +func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) { describeCollectionResponse, err := node.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeCollection, @@ -898,7 +898,7 @@ func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string return ret, nil } -func (node *NodeImpl) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) { +func (node *ProxyNode) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) { code := node.stateCode.Load().(internalpb2.StateCode) if code != internalpb2.StateCode_HEALTHY { return &milvuspb.RegisterLinkResponse{ diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index ed3598154863404903f3ef1452198fbfd87baf04..64f2ae3c368411f9f7ebf844893d8501c464217a 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -75,7 +75,7 @@ type InsertChannelsMap struct { droppedBitMap []int // 0 -> normal, 1 -> dropped usageHistogram []int // message stream can be closed only when the use count is zero mtx sync.RWMutex - nodeInstance *NodeImpl + nodeInstance *ProxyNode msFactory msgstream.Factory } @@ -188,7 +188,7 @@ func (m *InsertChannelsMap) closeAllMsgStream() { m.usageHistogram = make([]int, 0) } -func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap { +func newInsertChannelsMap(node *ProxyNode) *InsertChannelsMap { return &InsertChannelsMap{ collectionID2InsertChannels: make(map[UniqueID]int), insertChannels: make([][]string, 0), @@ -202,6 +202,6 @@ func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap { var globalInsertChannelsMap *InsertChannelsMap -func initGlobalInsertChannelsMap(node *NodeImpl) { +func initGlobalInsertChannelsMap(node *ProxyNode) { globalInsertChannelsMap = newInsertChannelsMap(node) } diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go index ba9f27d6e510ea77cd03c5e90b0a36423c374815..4322ce790aaefcf5aeb394f1de5d04129c25c896 100644 --- a/internal/proxynode/interface.go +++ b/internal/proxynode/interface.go @@ -67,7 +67,7 @@ type ProxyServiceClient interface { GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) } -type ProxyNode interface { +type Service interface { typeutil.Service InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index 0a394356318c0ea06606437ed2be28cfd78f5bec..b3f1c054c8f15c497bb75403da1e4190a13e84fb 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -26,7 +26,7 @@ import ( type UniqueID = typeutil.UniqueID type Timestamp = typeutil.Timestamp -type NodeImpl struct { +type ProxyNode struct { ctx context.Context cancel func() wg sync.WaitGroup @@ -59,10 +59,10 @@ type NodeImpl struct { closeCallbacks []func() } -func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) { +func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) - node := &NodeImpl{ + node := &ProxyNode{ ctx: ctx1, cancel: cancel, msFactory: factory, @@ -76,7 +76,7 @@ type Component interface { GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) } -func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error { +func (node *ProxyNode) waitForServiceReady(ctx context.Context, service Component, serviceName string) error { checkFunc := func() error { resp, err := service.GetComponentStates(ctx) @@ -100,7 +100,7 @@ func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component return nil } -func (node *NodeImpl) Init() error { +func (node *ProxyNode) Init() error { // todo wait for proxyservice state changed to Healthy ctx := context.Background() @@ -211,10 +211,10 @@ func (node *NodeImpl) Init() error { node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx) node.manipulationMsgStream.AsProducer(Params.InsertChannelNames) - repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { + repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) { return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true) } - node.manipulationMsgStream.SetRepackFunc(repackFuncImpl) + node.manipulationMsgStream.SetRepackFunc(repackFunc) log.Println("create manipulation message stream ...") node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory) @@ -227,7 +227,7 @@ func (node *NodeImpl) Init() error { return nil } -func (node *NodeImpl) Start() error { +func (node *ProxyNode) Start() error { err := InitMetaCache(node.masterClient) if err != nil { return err @@ -269,7 +269,7 @@ func (node *NodeImpl) Start() error { return nil } -func (node *NodeImpl) Stop() error { +func (node *ProxyNode) Stop() error { node.cancel() globalInsertChannelsMap.closeAllMsgStream() @@ -291,35 +291,35 @@ func (node *NodeImpl) Stop() error { } // AddStartCallback adds a callback in the startServer phase. -func (node *NodeImpl) AddStartCallback(callbacks ...func()) { +func (node *ProxyNode) AddStartCallback(callbacks ...func()) { node.startCallbacks = append(node.startCallbacks, callbacks...) } -func (node *NodeImpl) lastTick() Timestamp { +func (node *ProxyNode) lastTick() Timestamp { return node.tick.LastTick() } // AddCloseCallback adds a callback in the Close phase. -func (node *NodeImpl) AddCloseCallback(callbacks ...func()) { +func (node *ProxyNode) AddCloseCallback(callbacks ...func()) { node.closeCallbacks = append(node.closeCallbacks, callbacks...) } -func (node *NodeImpl) SetMasterClient(cli MasterClient) { +func (node *ProxyNode) SetMasterClient(cli MasterClient) { node.masterClient = cli } -func (node *NodeImpl) SetIndexServiceClient(cli IndexServiceClient) { +func (node *ProxyNode) SetIndexServiceClient(cli IndexServiceClient) { node.indexServiceClient = cli } -func (node *NodeImpl) SetDataServiceClient(cli DataServiceClient) { +func (node *ProxyNode) SetDataServiceClient(cli DataServiceClient) { node.dataServiceClient = cli } -func (node *NodeImpl) SetProxyServiceClient(cli ProxyServiceClient) { +func (node *ProxyNode) SetProxyServiceClient(cli ProxyServiceClient) { node.proxyServiceClient = cli } -func (node *NodeImpl) SetQueryServiceClient(cli QueryServiceClient) { +func (node *ProxyNode) SetQueryServiceClient(cli QueryServiceClient) { node.queryServiceClient = cli } diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index de4a36503519e020430e58b929152714215c47cf..b6a798ffa3e3e8e60c58ae3b1bb087de0245adfa 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -579,9 +579,9 @@ func (st *SearchTask) Execute(ctx context.Context) error { } msgPack.Msgs[0] = tsMsg err := st.queryMsgStream.Produce(ctx, msgPack) - log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs)) + log.Printf("[ProxyNode] length of searchMsg: %v", len(msgPack.Msgs)) if err != nil { - log.Printf("[NodeImpl] send search request failed: %v", err) + log.Printf("[ProxyNode] send search request failed: %v", err) } return err } diff --git a/internal/proxynode/task_scheduler.go b/internal/proxynode/task_scheduler.go index 21a70a189514f026315411008e3bc036b6454637..d29e272949585b941ac2dbc83dcd6c73784e3c31 100644 --- a/internal/proxynode/task_scheduler.go +++ b/internal/proxynode/task_scheduler.go @@ -173,11 +173,11 @@ func (queue *BaseTaskQueue) Enqueue(t task) error { } ts, _ := queue.sched.tsoAllocator.AllocOne() - // log.Printf("[NodeImpl] allocate timestamp: %v", ts) + // log.Printf("[ProxyNode] allocate timestamp: %v", ts) t.SetTs(ts) reqID, _ := queue.sched.idAllocator.AllocOne() - // log.Printf("[NodeImpl] allocate reqID: %v", reqID) + // log.Printf("[ProxyNode] allocate reqID: %v", reqID) t.SetID(reqID) return queue.addUnissuedTask(t) diff --git a/internal/proxyservice/impl.go b/internal/proxyservice/impl.go index bb8957e081df61359416003a5ef73ab06e92ee64..d7a4fb26b55adb5a24663ffb080f5cc8d81d33e1 100644 --- a/internal/proxyservice/impl.go +++ b/internal/proxyservice/impl.go @@ -30,7 +30,7 @@ const ( MilvusYamlContent = "milvus.yaml" ) -func (s *ServiceImpl) fillNodeInitParams() error { +func (s *ProxyService) fillNodeInitParams() error { s.nodeStartParams = make([]*commonpb.KeyValuePair, 0) getConfigContentByName := func(fileName string) []byte { @@ -92,7 +92,7 @@ func (s *ServiceImpl) fillNodeInitParams() error { return nil } -func (s *ServiceImpl) Init() error { +func (s *ProxyService) Init() error { err := s.fillNodeInitParams() if err != nil { return err @@ -134,14 +134,14 @@ func (s *ServiceImpl) Init() error { return nil } -func (s *ServiceImpl) Start() error { +func (s *ProxyService) Start() error { s.stateCode = internalpb2.StateCode_HEALTHY s.sched.Start() log.Println("start scheduler ...") return s.tick.Start() } -func (s *ServiceImpl) Stop() error { +func (s *ProxyService) Stop() error { s.sched.Close() log.Println("close scheduler ...") s.tick.Close() @@ -158,7 +158,7 @@ func (s *ServiceImpl) Stop() error { return nil } -func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { +func (s *ProxyService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { stateInfo := &internalpb2.ComponentInfo{ NodeID: UniqueID(0), Role: "ProxyService", @@ -175,11 +175,11 @@ func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp return ret, nil } -func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) { +func (s *ProxyService) UpdateStateCode(code internalpb2.StateCode) { s.stateCode = code } -func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (s *ProxyService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -188,11 +188,11 @@ func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR }, nil } -func (s *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { +func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { panic("implement me") } -func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) { +func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) { log.Println("register link") ctx, cancel := context.WithTimeout(ctx, timeoutInterval) defer cancel() @@ -230,7 +230,7 @@ func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkR return t.response, nil } -func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { +func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) { log.Println("RegisterNode: ", request) ctx, cancel := context.WithTimeout(ctx, timeoutInterval) defer cancel() @@ -271,7 +271,7 @@ func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.Registe return t.response, nil } -func (s *ServiceImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { +func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { log.Println("InvalidateCollectionMetaCache") ctx, cancel := context.WithTimeout(ctx, timeoutInterval) defer cancel() diff --git a/internal/proxyservice/interface.go b/internal/proxyservice/interface.go index 8894b1475fe375e973389b689c481753712a714e..b4f9d13b1aa3eae8a5336e133e150ccab1046bc5 100644 --- a/internal/proxyservice/interface.go +++ b/internal/proxyservice/interface.go @@ -9,12 +9,10 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) -type Component = typeutil.Component -type Service = typeutil.Service +type Service interface { + typeutil.Component + typeutil.TimeTickHandler -type ProxyService interface { - Component - Service RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) // TODO: i'm sure it's not a best way to keep consistency, fix me diff --git a/internal/proxyservice/nodeid_allocator.go b/internal/proxyservice/nodeid_allocator.go index 29dab96f76e3ee5f70f645f45cecf36acfd73408..a759f70afe9242788dd39ea3c40ded96273df0c0 100644 --- a/internal/proxyservice/nodeid_allocator.go +++ b/internal/proxyservice/nodeid_allocator.go @@ -15,13 +15,13 @@ type NodeIDAllocator interface { AllocOne() UniqueID } -type NaiveNodeIDAllocatorImpl struct { - impl *allocator.IDAllocator - now UniqueID - mtx sync.Mutex +type NaiveNodeIDAllocator struct { + allocator *allocator.IDAllocator + now UniqueID + mtx sync.Mutex } -func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID { +func (allocator *NaiveNodeIDAllocator) AllocOne() UniqueID { allocator.mtx.Lock() defer func() { // allocator.now++ @@ -31,7 +31,7 @@ func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID { } func NewNodeIDAllocator() NodeIDAllocator { - return &NaiveNodeIDAllocatorImpl{ + return &NaiveNodeIDAllocator{ now: 1, } } diff --git a/internal/proxyservice/proxyservice.go b/internal/proxyservice/proxyservice.go index 4d9b03cc70774da1aaf9c0f2e734b03dcd23e387..7a3ee4f51dfb1716501784b2e64bf854aec72fe6 100644 --- a/internal/proxyservice/proxyservice.go +++ b/internal/proxyservice/proxyservice.go @@ -10,10 +10,10 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -type ServiceImpl struct { +type ProxyService struct { allocator NodeIDAllocator sched *TaskScheduler - tick TimeTick + tick *TimeTick nodeInfos *GlobalNodeInfoTable stateCode internalpb2.StateCode @@ -27,10 +27,10 @@ type ServiceImpl struct { msFactory msgstream.Factory } -func NewServiceImpl(ctx context.Context, factory msgstream.Factory) (*ServiceImpl, error) { +func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) - s := &ServiceImpl{ + s := &ProxyService{ ctx: ctx1, cancel: cancel, msFactory: factory, diff --git a/internal/proxyservice/timetick.go b/internal/proxyservice/timetick.go index 92cf0c46fcbca9ffbe492a5e7f237b1ff5c4bff1..6d1638423318a4c56cec8d0a6e4fe2fab8e87c71 100644 --- a/internal/proxyservice/timetick.go +++ b/internal/proxyservice/timetick.go @@ -10,22 +10,15 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -type ( - TimeTick interface { - Start() error - Close() - } - - TimeTickImpl struct { - ttBarrier TimeTickBarrier - channels []msgstream.MsgStream - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - } -) +type TimeTick struct { + ttBarrier TimeTickBarrier + channels []msgstream.MsgStream + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} -func (tt *TimeTickImpl) Start() error { +func (tt *TimeTick) Start() error { log.Println("start time tick ...") tt.wg.Add(1) go func() { @@ -81,7 +74,7 @@ func (tt *TimeTickImpl) Start() error { return nil } -func (tt *TimeTickImpl) Close() { +func (tt *TimeTick) Close() { for _, channel := range tt.channels { channel.Close() } @@ -90,7 +83,7 @@ func (tt *TimeTickImpl) Close() { tt.wg.Wait() } -func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) TimeTick { +func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) *TimeTick { ctx1, cancel := context.WithCancel(ctx) - return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels} + return &TimeTick{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels} } diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index 5b64394d2488313aab3293200623143a883021ad..edba4bb444e15c68fe72708d1846844f21e62438 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -33,7 +33,7 @@ import ( * Every replica tracks a value called tSafe which is the maximum timestamp that the replica * is up-to-date. */ -type collectionReplica interface { +type ReplicaInterface interface { // collection getCollectionIDs() []UniqueID addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error @@ -69,12 +69,12 @@ type collectionReplica interface { getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) replaceGrowingSegmentBySealedSegment(segment *Segment) error - getTSafe() tSafe + getTSafe() tSafer freeAll() } -type collectionReplicaImpl struct { - tSafe tSafe +type collectionReplica struct { + tSafe tSafer mu sync.RWMutex // guards all collections map[UniqueID]*Collection @@ -83,7 +83,7 @@ type collectionReplicaImpl struct { } //----------------------------------------------------------------------------------------------------- collection -func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID { +func (colReplica *collectionReplica) getCollectionIDs() []UniqueID { colReplica.mu.RLock() defer colReplica.mu.RUnlock() collectionIDs := make([]UniqueID, 0) @@ -93,7 +93,7 @@ func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID { return collectionIDs } -func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { +func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -107,13 +107,13 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc return nil } -func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { +func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() return colReplica.removeCollectionPrivate(collectionID) } -func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error { +func (colReplica *collectionReplica) removeCollectionPrivate(collectionID UniqueID) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err @@ -131,13 +131,13 @@ func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID Un return nil } -func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { +func (colReplica *collectionReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.getCollectionByIDPrivate(collectionID) } -func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { +func (colReplica *collectionReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { collection, ok := colReplica.collections[collectionID] if !ok { return nil, fmt.Errorf("cannot find collection, id = %d", collectionID) @@ -146,24 +146,24 @@ func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID U return collection, nil } -func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool { +func (colReplica *collectionReplica) hasCollection(collectionID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.hasCollectionPrivate(collectionID) } -func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool { +func (colReplica *collectionReplica) hasCollectionPrivate(collectionID UniqueID) bool { _, ok := colReplica.collections[collectionID] return ok } -func (colReplica *collectionReplicaImpl) getCollectionNum() int { +func (colReplica *collectionReplica) getCollectionNum() int { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return len(colReplica.collections) } -func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) { +func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -175,7 +175,7 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) return collection.partitionIDs, nil } -func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { +func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -198,7 +198,7 @@ func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collection return vecFields, nil } -func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { +func (colReplica *collectionReplica) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -218,7 +218,7 @@ func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID return targetFields, nil } -func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) { +func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return nil, err @@ -232,13 +232,13 @@ func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collecti } //----------------------------------------------------------------------------------------------------- partition -func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error { +func (colReplica *collectionReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() return colReplica.addPartitionPrivate(collectionID, partitionID) } -func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { +func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err @@ -250,13 +250,13 @@ func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID Unique return nil } -func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error { +func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() return colReplica.removePartitionPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error { +func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID) error { partition, err := colReplica.getPartitionByIDPrivate(partitionID) if err != nil { return err @@ -279,13 +279,13 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID Uniq return nil } -func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) { +func (colReplica *collectionReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.getPartitionByIDPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) { +func (colReplica *collectionReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) { partition, ok := colReplica.partitions[partitionID] if !ok { return nil, fmt.Errorf("cannot find partition, id = %d", partitionID) @@ -294,30 +294,30 @@ func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID Uni return partition, nil } -func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool { +func (colReplica *collectionReplica) hasPartition(partitionID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.hasPartitionPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool { +func (colReplica *collectionReplica) hasPartitionPrivate(partitionID UniqueID) bool { _, ok := colReplica.partitions[partitionID] return ok } -func (colReplica *collectionReplicaImpl) getPartitionNum() int { +func (colReplica *collectionReplica) getPartitionNum() int { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return len(colReplica.partitions) } -func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { +func (colReplica *collectionReplica) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.getSegmentIDsPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) { +func (colReplica *collectionReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) { partition, err2 := colReplica.getPartitionByIDPrivate(partitionID) if err2 != nil { return nil, err2 @@ -325,7 +325,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID Unique return partition.segmentIDs, nil } -func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error { +func (colReplica *collectionReplica) enablePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -338,7 +338,7 @@ func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) e return nil } -func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error { +func (colReplica *collectionReplica) disablePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -351,7 +351,7 @@ func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) return nil } -func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID { +func (colReplica *collectionReplica) getEnabledPartitionIDsPrivate() []UniqueID { partitionIDs := make([]UniqueID, 0) for _, partition := range colReplica.partitions { if partition.enable { @@ -362,13 +362,13 @@ func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []Uniqu } //----------------------------------------------------------------------------------------------------- segment -func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { +func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType) } -func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { +func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err @@ -389,13 +389,13 @@ func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, p return nil } -func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { +func (colReplica *collectionReplica) removeSegment(segmentID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() return colReplica.removeSegmentPrivate(segmentID) } -func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error { +func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) error { log.Debug("remove segment", zap.Int64("segmentID", segmentID)) segment, err := colReplica.getSegmentByIDPrivate(segmentID) if err != nil { @@ -414,13 +414,13 @@ func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID return nil } -func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { +func (colReplica *collectionReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.getSegmentByIDPrivate(segmentID) } -func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) { +func (colReplica *collectionReplica) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) { segment, ok := colReplica.segments[segmentID] if !ok { return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10)) @@ -429,24 +429,24 @@ func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueI return segment, nil } -func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { +func (colReplica *collectionReplica) hasSegment(segmentID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return colReplica.hasSegmentPrivate(segmentID) } -func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool { +func (colReplica *collectionReplica) hasSegmentPrivate(segmentID UniqueID) bool { _, ok := colReplica.segments[segmentID] return ok } -func (colReplica *collectionReplicaImpl) getSegmentNum() int { +func (colReplica *collectionReplica) getSegmentNum() int { colReplica.mu.RLock() defer colReplica.mu.RUnlock() return len(colReplica.segments) } -func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats { +func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb2.SegmentStats { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -471,7 +471,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S return statisticData } -func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { +func (colReplica *collectionReplica) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -500,7 +500,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs } -func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { +func (colReplica *collectionReplica) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -519,7 +519,7 @@ func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmen return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs } -func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error { +func (colReplica *collectionReplica) replaceGrowingSegmentBySealedSegment(segment *Segment) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing { @@ -539,11 +539,11 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se } //----------------------------------------------------------------------------------------------------- -func (colReplica *collectionReplicaImpl) getTSafe() tSafe { +func (colReplica *collectionReplica) getTSafe() tSafer { return colReplica.tSafe } -func (colReplica *collectionReplicaImpl) freeAll() { +func (colReplica *collectionReplica) freeAll() { colReplica.mu.Lock() defer colReplica.mu.Unlock() @@ -556,14 +556,14 @@ func (colReplica *collectionReplicaImpl) freeAll() { colReplica.segments = make(map[UniqueID]*Segment) } -func newCollectionReplicaImpl() collectionReplica { +func newCollectionReplica() ReplicaInterface { collections := make(map[int64]*Collection) partitions := make(map[int64]*Partition) segments := make(map[int64]*Segment) tSafe := newTSafe() - var replica collectionReplica = &collectionReplicaImpl{ + var replica ReplicaInterface = &collectionReplica{ collections: collections, partitions: partitions, segments: segments, diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 68cb575b5c26c74236186775493e560bf7e0631d..fa816d7f66f65ee2c937373b86fd1aa4b995b75c 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -18,10 +18,10 @@ type dataSyncService struct { dmStream msgstream.MsgStream msFactory msgstream.Factory - replica collectionReplica + replica ReplicaInterface } -func newDataSyncService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *dataSyncService { +func newDataSyncService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *dataSyncService { service := &dataSyncService{ ctx: ctx, fg: nil, diff --git a/internal/querynode/flow_graph_dd_node.go b/internal/querynode/flow_graph_dd_node.go index 221c1a119c7cb1f4bb1f0db2d9c5e238633eb5bb..66b80cf11cf7950de0a00ff384ddbf69287e144e 100644 --- a/internal/querynode/flow_graph_dd_node.go +++ b/internal/querynode/flow_graph_dd_node.go @@ -14,7 +14,7 @@ import ( type ddNode struct { baseNode ddMsg *ddMsg - replica collectionReplica + replica ReplicaInterface } func (ddNode *ddNode) Name() string { @@ -160,7 +160,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { }) } -func newDDNode(replica collectionReplica) *ddNode { +func newDDNode(replica ReplicaInterface) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index c948dc5a9b664506800242ead9fdc1792eb902b3..50c7ac00ffdd7059fbf2f675011545fc4fd20f95 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -12,7 +12,7 @@ import ( type filterDmNode struct { baseNode - replica collectionReplica + replica ReplicaInterface } func (fdmNode *filterDmNode) Name() string { @@ -100,7 +100,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return msg } -func newFilteredDmNode(replica collectionReplica) *filterDmNode { +func newFilteredDmNode(replica ReplicaInterface) *filterDmNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index f4aba32508a714641eb9fb2662df866c86cad897..07ef91b4e7d935669154b99b068641f6e9ba53d3 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -10,7 +10,7 @@ import ( type gcNode struct { baseNode - replica collectionReplica + replica ReplicaInterface } func (gcNode *gcNode) Name() string { @@ -54,7 +54,7 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con return nil, ctx } -func newGCNode(replica collectionReplica) *gcNode { +func newGCNode(replica ReplicaInterface) *gcNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index 45ad4edce3ddcc7d7c01df2edd45c9440d3bc93a..9e4933b324e6f1b2e2d9cd1a604a87b6c07afc7c 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -12,7 +12,7 @@ import ( type insertNode struct { baseNode - replica collectionReplica + replica ReplicaInterface } type InsertData struct { @@ -120,7 +120,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn wg.Done() } -func newInsertNode(replica collectionReplica) *insertNode { +func newInsertNode(replica ReplicaInterface) *insertNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 3146a0829d1542f70399a5c22d45f39b4f7adc54..b6e12942df8b28f003acccb7203bf9ad1cac64ba 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -13,7 +13,7 @@ import ( type serviceTimeNode struct { baseNode - replica collectionReplica + replica ReplicaInterface timeTickMsgStream msgstream.MsgStream } @@ -71,7 +71,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error { return stNode.timeTickMsgStream.Produce(context.TODO(), &msgPack) } -func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode { +func newServiceTimeNode(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *serviceTimeNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism diff --git a/internal/querynode/index_loader.go b/internal/querynode/index_loader.go index 4d2848206a6502a7a7a05b05acce38d2ffdef989..7f9f44f17ea26eea3bd0edefda9141647a0c6ad3 100644 --- a/internal/querynode/index_loader.go +++ b/internal/querynode/index_loader.go @@ -26,7 +26,7 @@ import ( ) type indexLoader struct { - replica collectionReplica + replica ReplicaInterface fieldIndexes map[string][]*internalpb2.IndexStats fieldStatsChan chan []*internalpb2.FieldStats @@ -389,7 +389,7 @@ func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, in return nil } -func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader { +func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface) *indexLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, AccessKeyID: Params.MinioAccessKeyID, diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 3d78d5707c893b792d79737b0422120b5914d2fd..bbc9b05f51e2bb34c074041a237230dbfa110794 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -161,7 +161,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni return nil } -func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService { +func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *loadService { ctx1, cancel := context.WithCancel(ctx) segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream) diff --git a/internal/querynode/meta_service.go b/internal/querynode/meta_service.go index 97628ad935f8a7bb47a340525b311ecae34f4b0d..34daa6ba5769275e72353770365111949f6d79c6 100644 --- a/internal/querynode/meta_service.go +++ b/internal/querynode/meta_service.go @@ -27,10 +27,10 @@ const ( type metaService struct { ctx context.Context kvBase *etcdkv.EtcdKV - replica collectionReplica + replica ReplicaInterface } -func newMetaService(ctx context.Context, replica collectionReplica) *metaService { +func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService { ETCDAddr := Params.ETCDAddress MetaRootPath := Params.MetaRootPath var cli *clientv3.Client diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 32e783b7855cf43d5638b6e74f4c6a3218f50a9f..52326d4429621dd86423b3ba548132b19f7cdf37 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -55,7 +55,7 @@ type QueryNode struct { QueryNodeID UniqueID stateCode atomic.Value - replica collectionReplica + replica ReplicaInterface // internal services dataSyncService *dataSyncService @@ -88,7 +88,7 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F msFactory: factory, } - node.replica = newCollectionReplicaImpl() + node.replica = newCollectionReplica() node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return node } @@ -107,7 +107,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer msFactory: factory, } - node.replica = newCollectionReplicaImpl() + node.replica = newCollectionReplica() node.UpdateStateCode(internalpb2.StateCode_ABNORMAL) return node diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index 19d4e064309d54f17f9a9772f000d34d277ac1e9..018d351559569a1f5df81a95e2f83cc4eefc9293 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -23,7 +23,7 @@ type searchService struct { wait sync.WaitGroup cancel context.CancelFunc - replica collectionReplica + replica ReplicaInterface tSafeWatcher *tSafeWatcher serviceableTimeMutex sync.Mutex // guards serviceableTime @@ -38,7 +38,7 @@ type searchService struct { type ResultEntityIds []UniqueID -func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService { +func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService { receiveBufSize := Params.SearchReceiveBufSize searchStream, _ := factory.NewMsgStream(ctx) diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 997a208f6410d29c53b74aed8c75212cb4e70f71..01c855045bfd728d19fdad9ba89779ecdef80dbd 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -17,7 +17,7 @@ import ( // segmentLoader is only responsible for loading the field data from binlog type segmentLoader struct { - replica collectionReplica + replica ReplicaInterface dmStream msgstream.MsgStream @@ -191,7 +191,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetField return nil } -func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader { +func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *segmentLoader { option := &minioKV.Option{ Address: Params.MinioEndPoint, AccessKeyID: Params.MinioAccessKeyID, diff --git a/internal/querynode/stats_service.go b/internal/querynode/stats_service.go index ab34823c4a22259570676006aa8809ee338113e3..7d08d8528b8c915cd41b6343d71d1434d1a3dc2a 100644 --- a/internal/querynode/stats_service.go +++ b/internal/querynode/stats_service.go @@ -13,14 +13,14 @@ import ( type statsService struct { ctx context.Context - replica collectionReplica + replica ReplicaInterface fieldStatsChan chan []*internalpb2.FieldStats statsStream msgstream.MsgStream msFactory msgstream.Factory } -func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService { +func newStatsService(ctx context.Context, replica ReplicaInterface, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService { return &statsService{ ctx: ctx, diff --git a/internal/querynode/tsafe.go b/internal/querynode/tsafe.go index 60529a3c9868837c3e651dc057b8d407281edeb7..103e5200d36a522a451dfafc08b9c360b27cb332 100644 --- a/internal/querynode/tsafe.go +++ b/internal/querynode/tsafe.go @@ -24,38 +24,38 @@ func (watcher *tSafeWatcher) hasUpdate() { <-watcher.notifyChan } -type tSafe interface { +type tSafer interface { get() Timestamp set(t Timestamp) registerTSafeWatcher(t *tSafeWatcher) } -type tSafeImpl struct { +type tSafe struct { tSafeMu sync.Mutex // guards all fields tSafe Timestamp watcherList []*tSafeWatcher } -func newTSafe() tSafe { - var t tSafe = &tSafeImpl{ +func newTSafe() tSafer { + var t tSafer = &tSafe{ watcherList: make([]*tSafeWatcher, 0), } return t } -func (ts *tSafeImpl) registerTSafeWatcher(t *tSafeWatcher) { +func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) { ts.tSafeMu.Lock() defer ts.tSafeMu.Unlock() ts.watcherList = append(ts.watcherList, t) } -func (ts *tSafeImpl) get() Timestamp { +func (ts *tSafe) get() Timestamp { ts.tSafeMu.Lock() defer ts.tSafeMu.Unlock() return ts.tSafe } -func (ts *tSafeImpl) set(t Timestamp) { +func (ts *tSafe) set(t Timestamp) { ts.tSafeMu.Lock() defer ts.tSafeMu.Unlock() diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go index 15f06315319286fc315a302ec7105660e4cbec5f..749124631ad2182508ef6d99233ced0a8b4cd2d5 100644 --- a/internal/queryservice/meta_replica.go +++ b/internal/queryservice/meta_replica.go @@ -9,7 +9,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) -type metaReplica interface { +type Replica interface { getCollections(dbID UniqueID) ([]*collection, error) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) @@ -42,23 +42,23 @@ type collection struct { schema *schemapb.CollectionSchema } -type metaReplicaImpl struct { +type metaReplica struct { dbID []UniqueID db2collections map[UniqueID][]*collection } -func newMetaReplica() metaReplica { +func newMetaReplica() Replica { db2collections := make(map[UniqueID][]*collection) db2collections[0] = make([]*collection, 0) dbIDs := make([]UniqueID, 0) dbIDs = append(dbIDs, UniqueID(0)) - return &metaReplicaImpl{ + return &metaReplica{ dbID: dbIDs, db2collections: db2collections, } } -func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error { +func (mp *metaReplica) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error { //TODO:: assert dbID = 0 exist if _, ok := mp.db2collections[dbID]; ok { partitions := make(map[UniqueID]*partition) @@ -76,7 +76,7 @@ func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, s return errors.New("addCollection: can't find dbID when add collection") } -func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { +func (mp *metaReplica) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collection.id == collectionID { @@ -95,7 +95,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa return errors.New("addPartition: can't find collection when add partition") } -func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) { +func (mp *metaReplica) getCollections(dbID UniqueID) ([]*collection, error) { if collections, ok := mp.db2collections[dbID]; ok { return collections, nil } @@ -103,7 +103,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) return nil, errors.New("getCollections: can't find collectionID") } -func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) { +func (mp *metaReplica) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -119,7 +119,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ( return nil, errors.New("getPartitions: can't find partitionIDs") } -func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) { +func (mp *metaReplica) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -136,7 +136,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par return nil, errors.New("getSegments: can't find segmentID") } -func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) { +func (mp *metaReplica) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -148,7 +148,7 @@ func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueI return nil, errors.New("getCollectionByID: can't find collectionID") } -func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { +func (mp *metaReplica) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -163,7 +163,7 @@ func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID return nil, errors.New("getPartitionByID: can't find partitionID") } -func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, +func (mp *metaReplica) updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error { @@ -178,7 +178,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, return errors.New("updatePartitionState: update partition state fail") } -func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, +func (mp *metaReplica) getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) { partitionStates := make([]*querypb.PartitionStates, 0) @@ -202,7 +202,7 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, return partitionStates, nil } -func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error { +func (mp *metaReplica) releaseCollection(dbID UniqueID, collectionID UniqueID) error { if collections, ok := mp.db2collections[dbID]; ok { for i, collection := range collections { if collectionID == collection.id { @@ -220,7 +220,7 @@ func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueI return errors.New(errorStr) } -func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { +func (mp *metaReplica) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -236,7 +236,7 @@ func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID return errors.New(errorStr) } -func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error { +func (mp *metaReplica) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { @@ -250,7 +250,7 @@ func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, c return errors.New("addDmChannels: can't find dbID or collectionID") } -func (mp *metaReplicaImpl) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) { +func (mp *metaReplica) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) { if collections, ok := mp.db2collections[dbID]; ok { for _, collection := range collections { if collectionID == collection.id { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 2179028cf1a18f6b56a3006d474890ba0fd7b2b0..dd6d25ebe60b200cb4ab45ee817e5e4a89b14357 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -25,8 +25,24 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/util/retry" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) +type Service interface { + typeutil.Component + + RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) + ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) + LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) + ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) + ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) + LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error) + ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) + CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error) + GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) + GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) +} + type MasterServiceInterface interface { ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) @@ -60,7 +76,7 @@ type QueryService struct { loopCancel context.CancelFunc queryServiceID uint64 - replica metaReplica + replica Replica dataServiceClient DataServiceInterface masterServiceClient MasterServiceInterface diff --git a/internal/util/typeutil/interface.go b/internal/util/typeutil/interface.go index 985b89717cea62a5d147227edb7b4d19ab10c7e2..af0fd89646ad91fd872a6a575623f86accb97e62 100644 --- a/internal/util/typeutil/interface.go +++ b/internal/util/typeutil/interface.go @@ -10,6 +10,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) +type TimeTickHandler interface { + GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) +} type Service interface { Init() error Start() error @@ -18,7 +21,6 @@ type Service interface { type Component interface { GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) - GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) }