提交 74154a11 编写于 作者: Q quicksilver 提交者: yefu.chen

Fix deploy error during regression stage

Signed-off-by: Nquicksilver <zhifeng.zhang@zilliz.com>
上级 5c569e51
......@@ -6,6 +6,7 @@ timeout(time: 60, unit: 'MINUTES') {
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d etcd'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d minio'
dir ('build/docker/deploy') {
sh 'docker pull ${TARGET_REPO}/milvus-distributed:${TARGET_TAG}'
if ("${REGRESSION_SERVICE_NAME}" == "regression_distributed") {
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d master'
sh 'docker-compose -p ${DOCKER_COMPOSE_PROJECT_NAME}-${REGRESSION_SERVICE_NAME} up -d indexservice'
......
......@@ -8,22 +8,22 @@ import (
)
type (
allocator interface {
allocatorInterface interface {
allocID() (UniqueID, error)
}
allocatorImpl struct {
allocator struct {
masterService MasterServiceInterface
}
)
func newAllocatorImpl(s MasterServiceInterface) *allocatorImpl {
return &allocatorImpl{
func newAllocator(s MasterServiceInterface) *allocator {
return &allocator{
masterService: s,
}
}
func (alloc *allocatorImpl) allocID() (UniqueID, error) {
func (alloc *allocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := alloc.masterService.AllocID(ctx, &masterpb.IDRequest{
Base: &commonpb.MsgBase{
......
......@@ -29,32 +29,30 @@ type Replica interface {
getSegmentByID(segmentID UniqueID) (*Segment, error)
}
type (
Segment struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb2.MsgPosition
endPosition *internalpb2.MsgPosition // not using
}
type Segment struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb2.MsgPosition
endPosition *internalpb2.MsgPosition // not using
}
ReplicaImpl struct {
mu sync.RWMutex
segments []*Segment
collections map[UniqueID]*Collection
}
)
type CollectionSegmentReplica struct {
mu sync.RWMutex
segments []*Segment
collections map[UniqueID]*Collection
}
func newReplica() Replica {
segments := make([]*Segment, 0)
collections := make(map[UniqueID]*Collection)
var replica Replica = &ReplicaImpl{
var replica Replica = &CollectionSegmentReplica{
segments: segments,
collections: collections,
}
......@@ -62,7 +60,7 @@ func newReplica() Replica {
}
// --- segment ---
func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
......@@ -74,7 +72,7 @@ func (replica *ReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error)
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
}
func (replica *ReplicaImpl) addSegment(
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
......@@ -101,7 +99,7 @@ func (replica *ReplicaImpl) addSegment(
return nil
}
func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error {
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
......@@ -117,7 +115,7 @@ func (replica *ReplicaImpl) removeSegment(segmentID UniqueID) error {
return fmt.Errorf("Error, there's no segment %v", segmentID)
}
func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool {
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
......@@ -129,7 +127,7 @@ func (replica *ReplicaImpl) hasSegment(segmentID UniqueID) bool {
return false
}
func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) error {
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
......@@ -144,7 +142,7 @@ func (replica *ReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64)
return fmt.Errorf("Error, there's no segment %v", segmentID)
}
func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) {
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
......@@ -169,14 +167,14 @@ func (replica *ReplicaImpl) getSegmentStatisticsUpdates(segmentID UniqueID) (*in
}
// --- collection ---
func (replica *ReplicaImpl) getCollectionNum() int {
func (replica *CollectionSegmentReplica) getCollectionNum() int {
replica.mu.RLock()
defer replica.mu.RUnlock()
return len(replica.collections)
}
func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
func (replica *CollectionSegmentReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
replica.mu.Lock()
defer replica.mu.Unlock()
......@@ -195,7 +193,7 @@ func (replica *ReplicaImpl) addCollection(collectionID UniqueID, schema *schemap
return nil
}
func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error {
func (replica *CollectionSegmentReplica) removeCollection(collectionID UniqueID) error {
replica.mu.Lock()
defer replica.mu.Unlock()
......@@ -204,7 +202,7 @@ func (replica *ReplicaImpl) removeCollection(collectionID UniqueID) error {
return nil
}
func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
......@@ -216,7 +214,7 @@ func (replica *ReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collectio
return coll, nil
}
func (replica *ReplicaImpl) hasCollection(collectionID UniqueID) bool {
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
......
......@@ -16,10 +16,10 @@ func TestReplica_Collection(t *testing.T) {
replica := newReplica()
assert.Zero(t, replica.getCollectionNum())
replica = new(ReplicaImpl)
replica = new(CollectionSegmentReplica)
assert.Zero(t, replica.getCollectionNum())
replica = &ReplicaImpl{
replica = &CollectionSegmentReplica{
collections: map[UniqueID]*Collection{
0: {id: 0},
1: {id: 1},
......
......@@ -157,7 +157,7 @@ func (node *DataNode) Init() error {
replica := newReplica()
var alloc allocator = newAllocatorImpl(node.masterService)
var alloc allocatorInterface = newAllocator(node.masterService)
chanSize := 100
node.flushChan = make(chan *flushMsg, chanSize)
......
......@@ -19,12 +19,12 @@ type dataSyncService struct {
fg *flowgraph.TimeTickedFlowGraph
flushChan chan *flushMsg
replica Replica
idAllocator allocator
idAllocator allocatorInterface
msFactory msgstream.Factory
}
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
replica Replica, alloc allocator, factory msgstream.Factory) *dataSyncService {
replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
service := &dataSyncService{
ctx: ctx,
fg: nil,
......
......@@ -26,7 +26,7 @@ type ddNode struct {
ddBuffer *ddBuffer
inFlushCh chan *flushMsg
idAllocator allocator
idAllocator allocatorInterface
kv kv.Base
replica Replica
flushMeta *metaTable
......@@ -369,7 +369,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
}
func newDDNode(ctx context.Context, flushMeta *metaTable,
inFlushCh chan *flushMsg, replica Replica, alloc allocator) *ddNode {
inFlushCh chan *flushMsg, replica Replica, alloc allocatorInterface) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -41,7 +41,7 @@ type (
minIOKV kv.Base
minioPrefix string
idAllocator allocator
idAllocator allocatorInterface
timeTickStream msgstream.MsgStream
segmentStatisticsStream msgstream.MsgStream
......@@ -622,7 +622,7 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
}
func newInsertBufferNode(ctx context.Context, flushMeta *metaTable,
replica Replica, alloc allocator, factory msgstream.Factory) *insertBufferNode {
replica Replica, alloc allocatorInterface, factory msgstream.Factory) *insertBufferNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -7,22 +7,22 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
)
type allocator interface {
type allocatorInterface interface {
allocTimestamp() (Timestamp, error)
allocID() (UniqueID, error)
}
type allocatorImpl struct {
type allocator struct {
masterClient MasterClient
}
func newAllocatorImpl(masterClient MasterClient) *allocatorImpl {
return &allocatorImpl{
func newAllocator(masterClient MasterClient) *allocator {
return &allocator{
masterClient: masterClient,
}
}
func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
func (allocator *allocator) allocTimestamp() (Timestamp, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocTimestamp(ctx, &masterpb.TsoRequest{
Base: &commonpb.MsgBase{
......@@ -39,7 +39,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
return resp.Timestamp, nil
}
func (allocator *allocatorImpl) allocID() (UniqueID, error) {
func (allocator *allocator) allocID() (UniqueID, error) {
ctx := context.TODO()
resp, err := allocator.masterClient.AllocID(ctx, &masterpb.IDRequest{
Base: &commonpb.MsgBase{
......
......@@ -10,10 +10,10 @@ import (
type ddHandler struct {
meta *meta
segmentAllocator segmentAllocator
segmentAllocator segmentAllocatorInterface
}
func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler {
func newDDHandler(meta *meta, allocator segmentAllocatorInterface) *ddHandler {
return &ddHandler{
meta: meta,
segmentAllocator: allocator,
......
......@@ -15,7 +15,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
)
func newMemoryMeta(allocator allocator) (*meta, error) {
func newMemoryMeta(allocator allocatorInterface) (*meta, error) {
memoryKV := memkv.NewMemoryKV()
return newMeta(memoryKV)
}
......
......@@ -26,7 +26,7 @@ func (err errRemainInSufficient) Error() string {
}
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocator interface {
type segmentAllocatorInterface interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation.
......@@ -45,34 +45,32 @@ type segmentAllocator interface {
IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
}
type (
segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
total int
sealed bool
lastExpireTime Timestamp
allocations []*allocation
insertChannel string
}
allocation struct {
rowNums int
expireTime Timestamp
}
segmentAllocatorImpl struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
segmentExpireDuration int64
segmentThreshold float64
segmentThresholdFactor float64
mu sync.RWMutex
allocator allocator
}
)
type segmentStatus struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
total int
sealed bool
lastExpireTime Timestamp
allocations []*allocation
insertChannel string
}
type allocation struct {
rowNums int
expireTime Timestamp
}
type segmentAllocator struct {
mt *meta
segments map[UniqueID]*segmentStatus //segment id -> status
segmentExpireDuration int64
segmentThreshold float64
segmentThresholdFactor float64
mu sync.RWMutex
allocator allocatorInterface
}
func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl {
segmentAllocator := &segmentAllocatorImpl{
func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAllocator {
segmentAllocator := &segmentAllocator{
mt: meta,
segments: make(map[UniqueID]*segmentStatus),
segmentExpireDuration: Params.SegIDAssignExpiration,
......@@ -83,7 +81,7 @@ func newSegmentAllocator(meta *meta, allocator allocator) *segmentAllocatorImpl
return segmentAllocator
}
func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
allocator.mu.Lock()
defer allocator.mu.Unlock()
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
......@@ -105,7 +103,7 @@ func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentIn
return nil
}
func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
func (allocator *segmentAllocator) AllocSegment(collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
......@@ -133,7 +131,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID,
return
}
func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
func (allocator *segmentAllocator) alloc(segStatus *segmentStatus, numRows int) (bool, error) {
totalOfAllocations := 0
for _, allocation := range segStatus.allocations {
totalOfAllocations += allocation.rowNums
......@@ -163,7 +161,7 @@ func (allocator *segmentAllocatorImpl) alloc(segStatus *segmentStatus, numRows i
return true, nil
}
func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID) (int, error) {
func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) {
collMeta, err := allocator.mt.GetCollection(collectionID)
if err != nil {
return -1, err
......@@ -175,7 +173,7 @@ func (allocator *segmentAllocatorImpl) estimateTotalRows(collectionID UniqueID)
return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
}
func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
func (allocator *segmentAllocator) GetSealedSegments() ([]UniqueID, error) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
keys := make([]UniqueID, 0)
......@@ -194,7 +192,7 @@ func (allocator *segmentAllocatorImpl) GetSealedSegments() ([]UniqueID, error) {
return keys, nil
}
func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
segMeta, err := allocator.mt.GetSegment(segStatus.id)
if err != nil {
return false, err
......@@ -202,7 +200,7 @@ func (allocator *segmentAllocatorImpl) checkSegmentSealed(segStatus *segmentStat
return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
}
func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error {
func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
allocator.mu.Lock()
defer allocator.mu.Unlock()
status, ok := allocator.segments[segmentID]
......@@ -213,13 +211,13 @@ func (allocator *segmentAllocatorImpl) SealSegment(segmentID UniqueID) error {
return nil
}
func (allocator *segmentAllocatorImpl) DropSegment(segmentID UniqueID) {
func (allocator *segmentAllocator) DropSegment(segmentID UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
delete(allocator.segments, segmentID)
}
func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) error {
func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, segStatus := range allocator.segments {
......@@ -234,7 +232,7 @@ func (allocator *segmentAllocatorImpl) ExpireAllocations(timeTick Timestamp) err
return nil
}
func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
allocator.mu.RLock()
defer allocator.mu.RUnlock()
status, ok := allocator.segments[segmentID]
......@@ -244,7 +242,7 @@ func (allocator *segmentAllocatorImpl) IsAllocationsExpired(segmentID UniqueID,
return status.lastExpireTime <= ts, nil
}
func (allocator *segmentAllocatorImpl) SealAllSegments(collectionID UniqueID) {
func (allocator *segmentAllocator) SealAllSegments(collectionID UniqueID) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, status := range allocator.segments {
......
......@@ -32,8 +32,8 @@ import (
const role = "dataservice"
type DataService interface {
typeutil.Service
typeutil.Component
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
......@@ -77,10 +77,10 @@ type (
state atomic.Value
client *etcdkv.EtcdKV
meta *meta
segAllocator segmentAllocator
segAllocator segmentAllocatorInterface
statsHandler *statsHandler
ddHandler *ddHandler
allocator allocator
allocator allocatorInterface
cluster *dataNodeCluster
msgProducer *timesync.MsgProducer
registerFinishCh chan struct{}
......@@ -136,7 +136,7 @@ func (s *Server) Start() error {
return err
}
s.allocator = newAllocatorImpl(s.masterClient)
s.allocator = newAllocator(s.masterClient)
if err = s.initMeta(); err != nil {
return err
}
......
......@@ -13,18 +13,18 @@ import (
type (
proxyTimeTickWatcher struct {
allocator segmentAllocator
allocator segmentAllocatorInterface
msgQueue chan *msgstream.TimeTickMsg
}
dataNodeTimeTickWatcher struct {
meta *meta
cluster *dataNodeCluster
allocator segmentAllocator
allocator segmentAllocatorInterface
msgQueue chan *msgstream.TimeTickMsg
}
)
func newProxyTimeTickWatcher(allocator segmentAllocator) *proxyTimeTickWatcher {
func newProxyTimeTickWatcher(allocator segmentAllocatorInterface) *proxyTimeTickWatcher {
return &proxyTimeTickWatcher{
allocator: allocator,
msgQueue: make(chan *msgstream.TimeTickMsg, 1),
......@@ -49,7 +49,7 @@ func (watcher *proxyTimeTickWatcher) Watch(msg *msgstream.TimeTickMsg) {
watcher.msgQueue <- msg
}
func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocator, cluster *dataNodeCluster) *dataNodeTimeTickWatcher {
func newDataNodeTimeTickWatcher(meta *meta, allocator segmentAllocatorInterface, cluster *dataNodeCluster) *dataNodeTimeTickWatcher {
return &dataNodeTimeTickWatcher{
meta: meta,
allocator: allocator,
......
......@@ -20,7 +20,7 @@ import (
)
type Server struct {
impl *indexnode.NodeImpl
impl *indexnode.IndexNode
grpcServer *grpc.Server
grpcErrChan chan error
......@@ -164,7 +164,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty
func NewServer(ctx context.Context) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
node, err := indexnode.NewNodeImpl(ctx1)
node, err := indexnode.NewIndexNode(ctx1)
if err != nil {
defer cancel()
return nil, err
......
......@@ -26,7 +26,7 @@ type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Server struct {
impl *indexservice.ServiceImpl
impl *indexservice.IndexService
grpcServer *grpc.Server
grpcErrChan chan error
......@@ -161,7 +161,7 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty
func NewServer(ctx context.Context) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
serverImp, err := indexservice.NewServiceImpl(ctx)
serverImp, err := indexservice.NewIndexService(ctx)
if err != nil {
defer cancel()
return nil, err
......
......@@ -37,7 +37,7 @@ const (
type Server struct {
ctx context.Context
wg sync.WaitGroup
impl *proxynode.NodeImpl
impl *proxynode.ProxyNode
grpcServer *grpc.Server
grpcErrChan chan error
......@@ -60,7 +60,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
server.impl, err = proxynode.NewProxyNode(server.ctx, factory)
if err != nil {
return nil, err
}
......
......@@ -30,7 +30,7 @@ type Server struct {
grpcServer *grpc.Server
grpcErrChan chan error
impl *proxyservice.ServiceImpl
impl *proxyservice.ProxyService
tracer opentracing.Tracer
closer io.Closer
......@@ -60,7 +60,7 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
}
opentracing.SetGlobalTracer(server.tracer)
server.impl, err = proxyservice.NewServiceImpl(server.ctx, factory)
server.impl, err = proxyservice.NewProxyService(server.ctx, factory)
if err != nil {
return nil, err
}
......@@ -131,7 +131,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) start() error {
log.Println("proxy ServiceImpl start ...")
log.Println("proxy ProxyService start ...")
if err := s.impl.Start(); err != nil {
return err
}
......
......@@ -29,7 +29,7 @@ const (
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type NodeImpl struct {
type IndexNode struct {
stateCode internalpb2.StateCode
loopCtx context.Context
......@@ -48,9 +48,9 @@ type NodeImpl struct {
closer io.Closer
}
func NewNodeImpl(ctx context.Context) (*NodeImpl, error) {
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
ctx1, cancel := context.WithCancel(ctx)
b := &NodeImpl{
b := &IndexNode{
loopCtx: ctx1,
loopCancel: cancel,
}
......@@ -62,7 +62,7 @@ func NewNodeImpl(ctx context.Context) (*NodeImpl, error) {
return b, nil
}
func (i *NodeImpl) Init() error {
func (i *IndexNode) Init() error {
ctx := context.Background()
err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 10, time.Second)
......@@ -125,7 +125,7 @@ func (i *NodeImpl) Init() error {
return nil
}
func (i *NodeImpl) Start() error {
func (i *IndexNode) Start() error {
i.sched.Start()
// Start callbacks
......@@ -136,7 +136,7 @@ func (i *NodeImpl) Start() error {
}
// Close closes the server.
func (i *NodeImpl) Stop() error {
func (i *IndexNode) Stop() error {
if err := i.closer.Close(); err != nil {
return err
}
......@@ -151,15 +151,15 @@ func (i *NodeImpl) Stop() error {
return nil
}
func (i *NodeImpl) UpdateStateCode(code internalpb2.StateCode) {
func (i *IndexNode) UpdateStateCode(code internalpb2.StateCode) {
i.stateCode = code
}
func (i *NodeImpl) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) {
func (i *IndexNode) SetIndexServiceClient(serviceClient typeutil.IndexServiceInterface) {
i.serviceClient = serviceClient
}
func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
t := &IndexBuildTask{
BaseTask: BaseTask{
ctx: ctx,
......@@ -185,7 +185,7 @@ func (i *NodeImpl) BuildIndex(ctx context.Context, request *indexpb.BuildIndexCm
return ret, nil
}
func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......@@ -194,16 +194,16 @@ func (i *NodeImpl) DropIndex(ctx context.Context, request *indexpb.DropIndexRequ
}
// AddStartCallback adds a callback in the startServer phase.
func (i *NodeImpl) AddStartCallback(callbacks ...func()) {
func (i *IndexNode) AddStartCallback(callbacks ...func()) {
i.startCallbacks = append(i.startCallbacks, callbacks...)
}
// AddCloseCallback adds a callback in the Close phase.
func (i *NodeImpl) AddCloseCallback(callbacks ...func()) {
func (i *IndexNode) AddCloseCallback(callbacks ...func()) {
i.closeCallbacks = append(i.closeCallbacks, callbacks...)
}
func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
stateInfo := &internalpb2.ComponentInfo{
NodeID: Params.NodeID,
......@@ -221,7 +221,7 @@ func (i *NodeImpl) GetComponentStates(ctx context.Context) (*internalpb2.Compone
return ret, nil
}
func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......@@ -229,7 +229,7 @@ func (i *NodeImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResp
}, nil
}
func (i *NodeImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......
......@@ -30,7 +30,7 @@ const (
reqTimeoutInterval = time.Second * 10
)
type ServiceImpl struct {
type IndexService struct {
nodeClients *PriorityQueue
nodeStates map[UniqueID]*internalpb2.ComponentStates
stateCode internalpb2.StateCode
......@@ -59,9 +59,9 @@ type ServiceImpl struct {
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) {
func NewIndexService(ctx context.Context) (*IndexService, error) {
ctx1, cancel := context.WithCancel(ctx)
i := &ServiceImpl{
i := &IndexService{
loopCtx: ctx1,
loopCancel: cancel,
nodeClients: &PriorityQueue{},
......@@ -70,7 +70,7 @@ func NewServiceImpl(ctx context.Context) (*ServiceImpl, error) {
return i, nil
}
func (i *ServiceImpl) Init() error {
func (i *IndexService) Init() error {
etcdAddress := Params.EtcdAddress
log.Println("etcd address = ", etcdAddress)
connectEtcdFn := func() error {
......@@ -125,7 +125,7 @@ func (i *ServiceImpl) Init() error {
return nil
}
func (i *ServiceImpl) Start() error {
func (i *IndexService) Start() error {
i.loopWg.Add(1)
go i.tsLoop()
......@@ -134,12 +134,12 @@ func (i *ServiceImpl) Start() error {
for _, cb := range i.startCallbacks {
cb()
}
log.Print("ServiceImpl start")
log.Print("IndexService start")
return nil
}
func (i *ServiceImpl) Stop() error {
func (i *IndexService) Stop() error {
i.loopCancel()
i.sched.Close()
for _, cb := range i.closeCallbacks {
......@@ -148,15 +148,15 @@ func (i *ServiceImpl) Stop() error {
return nil
}
func (i *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
func (i *IndexService) UpdateStateCode(code internalpb2.StateCode) {
i.stateCode = code
}
func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
func (i *IndexService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
stateInfo := &internalpb2.ComponentInfo{
NodeID: i.ID,
Role: "ServiceImpl",
Role: "IndexService",
StateCode: i.stateCode,
}
......@@ -170,7 +170,7 @@ func (i *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp
return ret, nil
}
func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (i *IndexService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......@@ -180,7 +180,7 @@ func (i *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
}, nil
}
func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......@@ -190,7 +190,7 @@ func (i *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
}, nil
}
func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
fmt.Println("builder building index ..., indexName = ", req.IndexName, "indexID = ", req.IndexID, "dataPath = ", req.DataPaths)
ret := &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
......@@ -245,7 +245,7 @@ func (i *ServiceImpl) BuildIndex(ctx context.Context, req *indexpb.BuildIndexReq
return ret, nil
}
func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
func (i *IndexService) GetIndexStates(ctx context.Context, req *indexpb.IndexStatesRequest) (*indexpb.IndexStatesResponse, error) {
var indexStates []*indexpb.IndexInfo
for _, indexID := range req.IndexBuildIDs {
indexState, err := i.metaTable.GetIndexState(indexID)
......@@ -263,7 +263,7 @@ func (i *ServiceImpl) GetIndexStates(ctx context.Context, req *indexpb.IndexStat
return ret, nil
}
func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
i.sched.IndexAddQueue.tryToRemoveUselessIndexAddTask(req.IndexID)
err := i.metaTable.MarkIndexAsDeleted(req.IndexID)
......@@ -292,7 +292,7 @@ func (i *ServiceImpl) DropIndex(ctx context.Context, req *indexpb.DropIndexReque
}, nil
}
func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexFilePathsRequest) (*indexpb.IndexFilePathsResponse, error) {
var indexPaths []*indexpb.IndexFilePathInfo = nil
for _, indexID := range req.IndexBuildIDs {
......@@ -312,7 +312,7 @@ func (i *ServiceImpl) GetIndexFilePaths(ctx context.Context, req *indexpb.IndexF
return ret, nil
}
func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) {
func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIndexNotification) (*commonpb.Status, error) {
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
......@@ -327,7 +327,7 @@ func (i *ServiceImpl) NotifyBuildIndex(ctx context.Context, nty *indexpb.BuildIn
return ret, nil
}
func (i *ServiceImpl) tsLoop() {
func (i *IndexService) tsLoop() {
tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(i.loopCtx)
......
......@@ -12,13 +12,13 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
func (i *ServiceImpl) removeNode(nodeID UniqueID) {
func (i *IndexService) removeNode(nodeID UniqueID) {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
i.nodeClients.Remove(nodeID)
}
func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest) error {
i.nodeLock.Lock()
defer i.nodeLock.Unlock()
......@@ -46,7 +46,7 @@ func (i *ServiceImpl) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest)
return nil
}
func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair {
func (i *IndexService) prepareNodeInitParams() []*commonpb.KeyValuePair {
var params []*commonpb.KeyValuePair
params = append(params, &commonpb.KeyValuePair{Key: "minio.address", Value: Params.MinIOAddress})
params = append(params, &commonpb.KeyValuePair{Key: "minio.accessKeyID", Value: Params.MinIOAccessKeyID})
......@@ -56,7 +56,7 @@ func (i *ServiceImpl) prepareNodeInitParams() []*commonpb.KeyValuePair {
return params
}
func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
func (i *IndexService) RegisterNode(ctx context.Context, req *indexpb.RegisterNodeRequest) (*indexpb.RegisterNodeResponse, error) {
ret := &indexpb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -65,7 +65,7 @@ func (i *ServiceImpl) RegisterNode(ctx context.Context, req *indexpb.RegisterNod
nodeID, err := i.idAllocator.AllocOne()
if err != nil {
ret.Status.Reason = "ServiceImpl:RegisterNode Failed to acquire NodeID"
ret.Status.Reason = "IndexService:RegisterNode Failed to acquire NodeID"
return ret, nil
}
......
......@@ -60,10 +60,7 @@ type QueryServiceInterface interface {
type Interface interface {
//service
Init() error
Start() error
Stop() error
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
typeutil.Component
//DDL request
CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error)
......
......@@ -164,7 +164,8 @@ func TestMetaTable(t *testing.T) {
IndexParams: params,
}
_, field, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo)
ids, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo)
assert.Nil(t, ids)
assert.NotNil(t, err)
seg, field, err := mt.GetNotIndexedSegments("testColl", "field110", idxInfo)
assert.Nil(t, err)
......
......@@ -21,11 +21,11 @@ const (
reqTimeoutInterval = time.Second * 10
)
func (node *NodeImpl) UpdateStateCode(code internalpb2.StateCode) {
func (node *ProxyNode) UpdateStateCode(code internalpb2.StateCode) {
node.stateCode.Store(code)
}
func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
func (node *ProxyNode) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
collectionName := request.CollectionName
globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
return &commonpb.Status{
......@@ -34,7 +34,7 @@ func (node *NodeImpl) InvalidateCollectionMetaCache(ctx context.Context, request
}, nil
}
func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
log.Println("create collection: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -66,7 +66,7 @@ func (node *NodeImpl) CreateCollection(ctx context.Context, request *milvuspb.Cr
return cct.result, nil
}
func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
log.Println("drop collection: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -97,7 +97,7 @@ func (node *NodeImpl) DropCollection(ctx context.Context, request *milvuspb.Drop
return dct.result, nil
}
func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
func (node *ProxyNode) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
log.Println("has collection: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -132,7 +132,7 @@ func (node *NodeImpl) HasCollection(ctx context.Context, request *milvuspb.HasCo
return hct.result, nil
}
func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
log.Println("load collection: ", request)
//ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
//defer cancel()
......@@ -163,7 +163,7 @@ func (node *NodeImpl) LoadCollection(ctx context.Context, request *milvuspb.Load
return lct.result, nil
}
func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
log.Println("release collection: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -194,7 +194,7 @@ func (node *NodeImpl) ReleaseCollection(ctx context.Context, request *milvuspb.R
return rct.result, nil
}
func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
func (node *ProxyNode) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
log.Println("describe collection: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -229,7 +229,7 @@ func (node *NodeImpl) DescribeCollection(ctx context.Context, request *milvuspb.
return dct.result, nil
}
func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
func (node *ProxyNode) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
log.Println("get collection statistics")
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -263,7 +263,7 @@ func (node *NodeImpl) GetCollectionStatistics(ctx context.Context, request *milv
return g.result, nil
}
func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
func (node *ProxyNode) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
log.Println("show collections")
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -297,7 +297,7 @@ func (node *NodeImpl) ShowCollections(ctx context.Context, request *milvuspb.Sho
return sct.result, nil
}
func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
log.Println("create partition", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -326,7 +326,7 @@ func (node *NodeImpl) CreatePartition(ctx context.Context, request *milvuspb.Cre
return cpt.result, nil
}
func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
log.Println("drop partition: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -356,7 +356,7 @@ func (node *NodeImpl) DropPartition(ctx context.Context, request *milvuspb.DropP
return dpt.result, nil
}
func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
func (node *ProxyNode) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
log.Println("has partition: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -392,7 +392,7 @@ func (node *NodeImpl) HasPartition(ctx context.Context, request *milvuspb.HasPar
return hpt.result, nil
}
func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
func (node *ProxyNode) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
log.Println("load partitions: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -423,7 +423,7 @@ func (node *NodeImpl) LoadPartitions(ctx context.Context, request *milvuspb.Load
return lpt.result, nil
}
func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
func (node *ProxyNode) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
log.Println("load partitions: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -454,11 +454,11 @@ func (node *NodeImpl) ReleasePartitions(ctx context.Context, request *milvuspb.R
return rpt.result, nil
}
func (node *NodeImpl) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
func (node *ProxyNode) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
panic("implement me")
}
func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
func (node *ProxyNode) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
log.Println("show partitions: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -493,7 +493,7 @@ func (node *NodeImpl) ShowPartitions(ctx context.Context, request *milvuspb.Show
return spt.result, nil
}
func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
func (node *ProxyNode) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
log.Println("create index for: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -523,7 +523,7 @@ func (node *NodeImpl) CreateIndex(ctx context.Context, request *milvuspb.CreateI
return cit.result, nil
}
func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
func (node *ProxyNode) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
log.Println("Describe index for: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -557,7 +557,7 @@ func (node *NodeImpl) DescribeIndex(ctx context.Context, request *milvuspb.Descr
return dit.result, nil
}
func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
func (node *ProxyNode) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
log.Println("Drop index for: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -584,7 +584,7 @@ func (node *NodeImpl) DropIndex(ctx context.Context, request *milvuspb.DropIndex
return dit.result, nil
}
func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
func (node *ProxyNode) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
// log.Println("Describe index progress for: ", request)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -619,7 +619,7 @@ func (node *NodeImpl) GetIndexState(ctx context.Context, request *milvuspb.Index
return dipt.result, nil
}
func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
func (node *ProxyNode) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -671,7 +671,7 @@ func (node *NodeImpl) Insert(ctx context.Context, request *milvuspb.InsertReques
return it.result, nil
}
func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
func (node *ProxyNode) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -713,7 +713,7 @@ func (node *NodeImpl) Search(ctx context.Context, request *milvuspb.SearchReques
return qt.result, nil
}
func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
func (node *ProxyNode) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
log.Println("AA Flush collections: ", request.CollectionNames)
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
......@@ -743,11 +743,11 @@ func (node *NodeImpl) Flush(ctx context.Context, request *milvuspb.FlushRequest)
return ft.result, nil
}
func (node *NodeImpl) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
func (node *ProxyNode) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
panic("implement me")
}
func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
resp := &milvuspb.PersistentSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -794,7 +794,7 @@ func (node *NodeImpl) GetPersistentSegmentInfo(ctx context.Context, req *milvusp
return resp, nil
}
func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
func (node *ProxyNode) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
resp := &milvuspb.QuerySegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......@@ -839,7 +839,7 @@ func (node *NodeImpl) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.Que
return resp, nil
}
func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
func (node *ProxyNode) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
describeCollectionResponse, err := node.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeCollection,
......@@ -898,7 +898,7 @@ func (node *NodeImpl) getSegmentsOfCollection(ctx context.Context, dbName string
return ret, nil
}
func (node *NodeImpl) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
func (node *ProxyNode) RegisterLink(request *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
code := node.stateCode.Load().(internalpb2.StateCode)
if code != internalpb2.StateCode_HEALTHY {
return &milvuspb.RegisterLinkResponse{
......
......@@ -75,7 +75,7 @@ type InsertChannelsMap struct {
droppedBitMap []int // 0 -> normal, 1 -> dropped
usageHistogram []int // message stream can be closed only when the use count is zero
mtx sync.RWMutex
nodeInstance *NodeImpl
nodeInstance *ProxyNode
msFactory msgstream.Factory
}
......@@ -188,7 +188,7 @@ func (m *InsertChannelsMap) closeAllMsgStream() {
m.usageHistogram = make([]int, 0)
}
func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
func newInsertChannelsMap(node *ProxyNode) *InsertChannelsMap {
return &InsertChannelsMap{
collectionID2InsertChannels: make(map[UniqueID]int),
insertChannels: make([][]string, 0),
......@@ -202,6 +202,6 @@ func newInsertChannelsMap(node *NodeImpl) *InsertChannelsMap {
var globalInsertChannelsMap *InsertChannelsMap
func initGlobalInsertChannelsMap(node *NodeImpl) {
func initGlobalInsertChannelsMap(node *ProxyNode) {
globalInsertChannelsMap = newInsertChannelsMap(node)
}
......@@ -67,7 +67,7 @@ type ProxyServiceClient interface {
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
}
type ProxyNode interface {
type Service interface {
typeutil.Service
InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
......
......@@ -26,7 +26,7 @@ import (
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type NodeImpl struct {
type ProxyNode struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
......@@ -59,10 +59,10 @@ type NodeImpl struct {
closeCallbacks []func()
}
func NewProxyNodeImpl(ctx context.Context, factory msgstream.Factory) (*NodeImpl, error) {
func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) {
rand.Seed(time.Now().UnixNano())
ctx1, cancel := context.WithCancel(ctx)
node := &NodeImpl{
node := &ProxyNode{
ctx: ctx1,
cancel: cancel,
msFactory: factory,
......@@ -76,7 +76,7 @@ type Component interface {
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
}
func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component, serviceName string) error {
func (node *ProxyNode) waitForServiceReady(ctx context.Context, service Component, serviceName string) error {
checkFunc := func() error {
resp, err := service.GetComponentStates(ctx)
......@@ -100,7 +100,7 @@ func (node *NodeImpl) waitForServiceReady(ctx context.Context, service Component
return nil
}
func (node *NodeImpl) Init() error {
func (node *ProxyNode) Init() error {
// todo wait for proxyservice state changed to Healthy
ctx := context.Background()
......@@ -211,10 +211,10 @@ func (node *NodeImpl) Init() error {
node.manipulationMsgStream, _ = node.msFactory.NewMsgStream(node.ctx)
node.manipulationMsgStream.AsProducer(Params.InsertChannelNames)
repackFuncImpl := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
repackFunc := func(tsMsgs []msgstream.TsMsg, hashKeys [][]int32) (map[int32]*msgstream.MsgPack, error) {
return insertRepackFunc(tsMsgs, hashKeys, node.segAssigner, true)
}
node.manipulationMsgStream.SetRepackFunc(repackFuncImpl)
node.manipulationMsgStream.SetRepackFunc(repackFunc)
log.Println("create manipulation message stream ...")
node.sched, err = NewTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
......@@ -227,7 +227,7 @@ func (node *NodeImpl) Init() error {
return nil
}
func (node *NodeImpl) Start() error {
func (node *ProxyNode) Start() error {
err := InitMetaCache(node.masterClient)
if err != nil {
return err
......@@ -269,7 +269,7 @@ func (node *NodeImpl) Start() error {
return nil
}
func (node *NodeImpl) Stop() error {
func (node *ProxyNode) Stop() error {
node.cancel()
globalInsertChannelsMap.closeAllMsgStream()
......@@ -291,35 +291,35 @@ func (node *NodeImpl) Stop() error {
}
// AddStartCallback adds a callback in the startServer phase.
func (node *NodeImpl) AddStartCallback(callbacks ...func()) {
func (node *ProxyNode) AddStartCallback(callbacks ...func()) {
node.startCallbacks = append(node.startCallbacks, callbacks...)
}
func (node *NodeImpl) lastTick() Timestamp {
func (node *ProxyNode) lastTick() Timestamp {
return node.tick.LastTick()
}
// AddCloseCallback adds a callback in the Close phase.
func (node *NodeImpl) AddCloseCallback(callbacks ...func()) {
func (node *ProxyNode) AddCloseCallback(callbacks ...func()) {
node.closeCallbacks = append(node.closeCallbacks, callbacks...)
}
func (node *NodeImpl) SetMasterClient(cli MasterClient) {
func (node *ProxyNode) SetMasterClient(cli MasterClient) {
node.masterClient = cli
}
func (node *NodeImpl) SetIndexServiceClient(cli IndexServiceClient) {
func (node *ProxyNode) SetIndexServiceClient(cli IndexServiceClient) {
node.indexServiceClient = cli
}
func (node *NodeImpl) SetDataServiceClient(cli DataServiceClient) {
func (node *ProxyNode) SetDataServiceClient(cli DataServiceClient) {
node.dataServiceClient = cli
}
func (node *NodeImpl) SetProxyServiceClient(cli ProxyServiceClient) {
func (node *ProxyNode) SetProxyServiceClient(cli ProxyServiceClient) {
node.proxyServiceClient = cli
}
func (node *NodeImpl) SetQueryServiceClient(cli QueryServiceClient) {
func (node *ProxyNode) SetQueryServiceClient(cli QueryServiceClient) {
node.queryServiceClient = cli
}
......@@ -579,9 +579,9 @@ func (st *SearchTask) Execute(ctx context.Context) error {
}
msgPack.Msgs[0] = tsMsg
err := st.queryMsgStream.Produce(ctx, msgPack)
log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs))
log.Printf("[ProxyNode] length of searchMsg: %v", len(msgPack.Msgs))
if err != nil {
log.Printf("[NodeImpl] send search request failed: %v", err)
log.Printf("[ProxyNode] send search request failed: %v", err)
}
return err
}
......
......@@ -173,11 +173,11 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
}
ts, _ := queue.sched.tsoAllocator.AllocOne()
// log.Printf("[NodeImpl] allocate timestamp: %v", ts)
// log.Printf("[ProxyNode] allocate timestamp: %v", ts)
t.SetTs(ts)
reqID, _ := queue.sched.idAllocator.AllocOne()
// log.Printf("[NodeImpl] allocate reqID: %v", reqID)
// log.Printf("[ProxyNode] allocate reqID: %v", reqID)
t.SetID(reqID)
return queue.addUnissuedTask(t)
......
......@@ -30,7 +30,7 @@ const (
MilvusYamlContent = "milvus.yaml"
)
func (s *ServiceImpl) fillNodeInitParams() error {
func (s *ProxyService) fillNodeInitParams() error {
s.nodeStartParams = make([]*commonpb.KeyValuePair, 0)
getConfigContentByName := func(fileName string) []byte {
......@@ -92,7 +92,7 @@ func (s *ServiceImpl) fillNodeInitParams() error {
return nil
}
func (s *ServiceImpl) Init() error {
func (s *ProxyService) Init() error {
err := s.fillNodeInitParams()
if err != nil {
return err
......@@ -134,14 +134,14 @@ func (s *ServiceImpl) Init() error {
return nil
}
func (s *ServiceImpl) Start() error {
func (s *ProxyService) Start() error {
s.stateCode = internalpb2.StateCode_HEALTHY
s.sched.Start()
log.Println("start scheduler ...")
return s.tick.Start()
}
func (s *ServiceImpl) Stop() error {
func (s *ProxyService) Stop() error {
s.sched.Close()
log.Println("close scheduler ...")
s.tick.Close()
......@@ -158,7 +158,7 @@ func (s *ServiceImpl) Stop() error {
return nil
}
func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
func (s *ProxyService) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) {
stateInfo := &internalpb2.ComponentInfo{
NodeID: UniqueID(0),
Role: "ProxyService",
......@@ -175,11 +175,11 @@ func (s *ServiceImpl) GetComponentStates(ctx context.Context) (*internalpb2.Comp
return ret, nil
}
func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
func (s *ProxyService) UpdateStateCode(code internalpb2.StateCode) {
s.stateCode = code
}
func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (s *ProxyService) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
......@@ -188,11 +188,11 @@ func (s *ServiceImpl) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
}, nil
}
func (s *ServiceImpl) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
func (s *ProxyService) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
panic("implement me")
}
func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
func (s *ProxyService) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error) {
log.Println("register link")
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
......@@ -230,7 +230,7 @@ func (s *ServiceImpl) RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkR
return t.response, nil
}
func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
func (s *ProxyService) RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
log.Println("RegisterNode: ", request)
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
......@@ -271,7 +271,7 @@ func (s *ServiceImpl) RegisterNode(ctx context.Context, request *proxypb.Registe
return t.response, nil
}
func (s *ServiceImpl) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
func (s *ProxyService) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
log.Println("InvalidateCollectionMetaCache")
ctx, cancel := context.WithTimeout(ctx, timeoutInterval)
defer cancel()
......
......@@ -9,12 +9,10 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Component = typeutil.Component
type Service = typeutil.Service
type Service interface {
typeutil.Component
typeutil.TimeTickHandler
type ProxyService interface {
Component
Service
RegisterLink(ctx context.Context) (*milvuspb.RegisterLinkResponse, error)
RegisterNode(ctx context.Context, request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error)
// TODO: i'm sure it's not a best way to keep consistency, fix me
......
......@@ -15,13 +15,13 @@ type NodeIDAllocator interface {
AllocOne() UniqueID
}
type NaiveNodeIDAllocatorImpl struct {
impl *allocator.IDAllocator
now UniqueID
mtx sync.Mutex
type NaiveNodeIDAllocator struct {
allocator *allocator.IDAllocator
now UniqueID
mtx sync.Mutex
}
func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
func (allocator *NaiveNodeIDAllocator) AllocOne() UniqueID {
allocator.mtx.Lock()
defer func() {
// allocator.now++
......@@ -31,7 +31,7 @@ func (allocator *NaiveNodeIDAllocatorImpl) AllocOne() UniqueID {
}
func NewNodeIDAllocator() NodeIDAllocator {
return &NaiveNodeIDAllocatorImpl{
return &NaiveNodeIDAllocator{
now: 1,
}
}
......@@ -10,10 +10,10 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type ServiceImpl struct {
type ProxyService struct {
allocator NodeIDAllocator
sched *TaskScheduler
tick TimeTick
tick *TimeTick
nodeInfos *GlobalNodeInfoTable
stateCode internalpb2.StateCode
......@@ -27,10 +27,10 @@ type ServiceImpl struct {
msFactory msgstream.Factory
}
func NewServiceImpl(ctx context.Context, factory msgstream.Factory) (*ServiceImpl, error) {
func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) {
rand.Seed(time.Now().UnixNano())
ctx1, cancel := context.WithCancel(ctx)
s := &ServiceImpl{
s := &ProxyService{
ctx: ctx1,
cancel: cancel,
msFactory: factory,
......
......@@ -10,22 +10,15 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type (
TimeTick interface {
Start() error
Close()
}
TimeTickImpl struct {
ttBarrier TimeTickBarrier
channels []msgstream.MsgStream
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
)
type TimeTick struct {
ttBarrier TimeTickBarrier
channels []msgstream.MsgStream
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func (tt *TimeTickImpl) Start() error {
func (tt *TimeTick) Start() error {
log.Println("start time tick ...")
tt.wg.Add(1)
go func() {
......@@ -81,7 +74,7 @@ func (tt *TimeTickImpl) Start() error {
return nil
}
func (tt *TimeTickImpl) Close() {
func (tt *TimeTick) Close() {
for _, channel := range tt.channels {
channel.Close()
}
......@@ -90,7 +83,7 @@ func (tt *TimeTickImpl) Close() {
tt.wg.Wait()
}
func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) TimeTick {
func newTimeTick(ctx context.Context, ttBarrier TimeTickBarrier, channels ...msgstream.MsgStream) *TimeTick {
ctx1, cancel := context.WithCancel(ctx)
return &TimeTickImpl{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels}
return &TimeTick{ctx: ctx1, cancel: cancel, ttBarrier: ttBarrier, channels: channels}
}
......@@ -33,7 +33,7 @@ import (
* Every replica tracks a value called tSafe which is the maximum timestamp that the replica
* is up-to-date.
*/
type collectionReplica interface {
type ReplicaInterface interface {
// collection
getCollectionIDs() []UniqueID
addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error
......@@ -69,12 +69,12 @@ type collectionReplica interface {
getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
replaceGrowingSegmentBySealedSegment(segment *Segment) error
getTSafe() tSafe
getTSafe() tSafer
freeAll()
}
type collectionReplicaImpl struct {
tSafe tSafe
type collectionReplica struct {
tSafe tSafer
mu sync.RWMutex // guards all
collections map[UniqueID]*Collection
......@@ -83,7 +83,7 @@ type collectionReplicaImpl struct {
}
//----------------------------------------------------------------------------------------------------- collection
func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
func (colReplica *collectionReplica) getCollectionIDs() []UniqueID {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
collectionIDs := make([]UniqueID, 0)
......@@ -93,7 +93,7 @@ func (colReplica *collectionReplicaImpl) getCollectionIDs() []UniqueID {
return collectionIDs
}
func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
func (colReplica *collectionReplica) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -107,13 +107,13 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc
return nil
}
func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error {
func (colReplica *collectionReplica) removeCollection(collectionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.removeCollectionPrivate(collectionID)
}
func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error {
func (colReplica *collectionReplica) removeCollectionPrivate(collectionID UniqueID) error {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
......@@ -131,13 +131,13 @@ func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID Un
return nil
}
func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) {
func (colReplica *collectionReplica) getCollectionByID(collectionID UniqueID) (*Collection, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getCollectionByIDPrivate(collectionID)
}
func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
func (colReplica *collectionReplica) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) {
collection, ok := colReplica.collections[collectionID]
if !ok {
return nil, fmt.Errorf("cannot find collection, id = %d", collectionID)
......@@ -146,24 +146,24 @@ func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID U
return collection, nil
}
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
func (colReplica *collectionReplica) hasCollection(collectionID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.hasCollectionPrivate(collectionID)
}
func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool {
func (colReplica *collectionReplica) hasCollectionPrivate(collectionID UniqueID) bool {
_, ok := colReplica.collections[collectionID]
return ok
}
func (colReplica *collectionReplicaImpl) getCollectionNum() int {
func (colReplica *collectionReplica) getCollectionNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(colReplica.collections)
}
func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -175,7 +175,7 @@ func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID)
return collection.partitionIDs, nil
}
func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -198,7 +198,7 @@ func (colReplica *collectionReplicaImpl) getVecFieldIDsByCollectionID(collection
return vecFields, nil
}
func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
func (colReplica *collectionReplica) getFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -218,7 +218,7 @@ func (colReplica *collectionReplicaImpl) getFieldIDsByCollectionID(collectionID
return targetFields, nil
}
func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
func (colReplica *collectionReplica) getFieldsByCollectionIDPrivate(collectionID UniqueID) ([]*schemapb.FieldSchema, error) {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return nil, err
......@@ -232,13 +232,13 @@ func (colReplica *collectionReplicaImpl) getFieldsByCollectionIDPrivate(collecti
}
//----------------------------------------------------------------------------------------------------- partition
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error {
func (colReplica *collectionReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.addPartitionPrivate(collectionID, partitionID)
}
func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
func (colReplica *collectionReplica) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
......@@ -250,13 +250,13 @@ func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID Unique
return nil
}
func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error {
func (colReplica *collectionReplica) removePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.removePartitionPrivate(partitionID)
}
func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error {
func (colReplica *collectionReplica) removePartitionPrivate(partitionID UniqueID) error {
partition, err := colReplica.getPartitionByIDPrivate(partitionID)
if err != nil {
return err
......@@ -279,13 +279,13 @@ func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID Uniq
return nil
}
func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) {
func (colReplica *collectionReplica) getPartitionByID(partitionID UniqueID) (*Partition, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getPartitionByIDPrivate(partitionID)
}
func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
func (colReplica *collectionReplica) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) {
partition, ok := colReplica.partitions[partitionID]
if !ok {
return nil, fmt.Errorf("cannot find partition, id = %d", partitionID)
......@@ -294,30 +294,30 @@ func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID Uni
return partition, nil
}
func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool {
func (colReplica *collectionReplica) hasPartition(partitionID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.hasPartitionPrivate(partitionID)
}
func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool {
func (colReplica *collectionReplica) hasPartitionPrivate(partitionID UniqueID) bool {
_, ok := colReplica.partitions[partitionID]
return ok
}
func (colReplica *collectionReplicaImpl) getPartitionNum() int {
func (colReplica *collectionReplica) getPartitionNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(colReplica.partitions)
}
func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
func (colReplica *collectionReplica) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getSegmentIDsPrivate(partitionID)
}
func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
func (colReplica *collectionReplica) getSegmentIDsPrivate(partitionID UniqueID) ([]UniqueID, error) {
partition, err2 := colReplica.getPartitionByIDPrivate(partitionID)
if err2 != nil {
return nil, err2
......@@ -325,7 +325,7 @@ func (colReplica *collectionReplicaImpl) getSegmentIDsPrivate(partitionID Unique
return partition.segmentIDs, nil
}
func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) error {
func (colReplica *collectionReplica) enablePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -338,7 +338,7 @@ func (colReplica *collectionReplicaImpl) enablePartition(partitionID UniqueID) e
return nil
}
func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID) error {
func (colReplica *collectionReplica) disablePartition(partitionID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -351,7 +351,7 @@ func (colReplica *collectionReplicaImpl) disablePartition(partitionID UniqueID)
return nil
}
func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []UniqueID {
func (colReplica *collectionReplica) getEnabledPartitionIDsPrivate() []UniqueID {
partitionIDs := make([]UniqueID, 0)
for _, partition := range colReplica.partitions {
if partition.enable {
......@@ -362,13 +362,13 @@ func (colReplica *collectionReplicaImpl) getEnabledPartitionIDsPrivate() []Uniqu
}
//----------------------------------------------------------------------------------------------------- segment
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
func (colReplica *collectionReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType)
}
func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
func (colReplica *collectionReplica) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error {
collection, err := colReplica.getCollectionByIDPrivate(collectionID)
if err != nil {
return err
......@@ -389,13 +389,13 @@ func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, p
return nil
}
func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error {
func (colReplica *collectionReplica) removeSegment(segmentID UniqueID) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
return colReplica.removeSegmentPrivate(segmentID)
}
func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error {
func (colReplica *collectionReplica) removeSegmentPrivate(segmentID UniqueID) error {
log.Debug("remove segment", zap.Int64("segmentID", segmentID))
segment, err := colReplica.getSegmentByIDPrivate(segmentID)
if err != nil {
......@@ -414,13 +414,13 @@ func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID
return nil
}
func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) {
func (colReplica *collectionReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.getSegmentByIDPrivate(segmentID)
}
func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
func (colReplica *collectionReplica) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) {
segment, ok := colReplica.segments[segmentID]
if !ok {
return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10))
......@@ -429,24 +429,24 @@ func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueI
return segment, nil
}
func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
func (colReplica *collectionReplica) hasSegment(segmentID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return colReplica.hasSegmentPrivate(segmentID)
}
func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool {
func (colReplica *collectionReplica) hasSegmentPrivate(segmentID UniqueID) bool {
_, ok := colReplica.segments[segmentID]
return ok
}
func (colReplica *collectionReplicaImpl) getSegmentNum() int {
func (colReplica *collectionReplica) getSegmentNum() int {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(colReplica.segments)
}
func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats {
func (colReplica *collectionReplica) getSegmentStatistics() []*internalpb2.SegmentStats {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -471,7 +471,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.S
return statisticData
}
func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
func (colReplica *collectionReplica) getEnabledSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -500,7 +500,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSegmentsBySegmentType(segType
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}
func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
func (colReplica *collectionReplica) getSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
......@@ -519,7 +519,7 @@ func (colReplica *collectionReplicaImpl) getSegmentsBySegmentType(segType segmen
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}
func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
func (colReplica *collectionReplica) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing {
......@@ -539,11 +539,11 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
}
//-----------------------------------------------------------------------------------------------------
func (colReplica *collectionReplicaImpl) getTSafe() tSafe {
func (colReplica *collectionReplica) getTSafe() tSafer {
return colReplica.tSafe
}
func (colReplica *collectionReplicaImpl) freeAll() {
func (colReplica *collectionReplica) freeAll() {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
......@@ -556,14 +556,14 @@ func (colReplica *collectionReplicaImpl) freeAll() {
colReplica.segments = make(map[UniqueID]*Segment)
}
func newCollectionReplicaImpl() collectionReplica {
func newCollectionReplica() ReplicaInterface {
collections := make(map[int64]*Collection)
partitions := make(map[int64]*Partition)
segments := make(map[int64]*Segment)
tSafe := newTSafe()
var replica collectionReplica = &collectionReplicaImpl{
var replica ReplicaInterface = &collectionReplica{
collections: collections,
partitions: partitions,
segments: segments,
......
......@@ -18,10 +18,10 @@ type dataSyncService struct {
dmStream msgstream.MsgStream
msFactory msgstream.Factory
replica collectionReplica
replica ReplicaInterface
}
func newDataSyncService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *dataSyncService {
func newDataSyncService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *dataSyncService {
service := &dataSyncService{
ctx: ctx,
fg: nil,
......
......@@ -14,7 +14,7 @@ import (
type ddNode struct {
baseNode
ddMsg *ddMsg
replica collectionReplica
replica ReplicaInterface
}
func (ddNode *ddNode) Name() string {
......@@ -160,7 +160,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
})
}
func newDDNode(replica collectionReplica) *ddNode {
func newDDNode(replica ReplicaInterface) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -12,7 +12,7 @@ import (
type filterDmNode struct {
baseNode
replica collectionReplica
replica ReplicaInterface
}
func (fdmNode *filterDmNode) Name() string {
......@@ -100,7 +100,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg
return msg
}
func newFilteredDmNode(replica collectionReplica) *filterDmNode {
func newFilteredDmNode(replica ReplicaInterface) *filterDmNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -10,7 +10,7 @@ import (
type gcNode struct {
baseNode
replica collectionReplica
replica ReplicaInterface
}
func (gcNode *gcNode) Name() string {
......@@ -54,7 +54,7 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con
return nil, ctx
}
func newGCNode(replica collectionReplica) *gcNode {
func newGCNode(replica ReplicaInterface) *gcNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -12,7 +12,7 @@ import (
type insertNode struct {
baseNode
replica collectionReplica
replica ReplicaInterface
}
type InsertData struct {
......@@ -120,7 +120,7 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
wg.Done()
}
func newInsertNode(replica collectionReplica) *insertNode {
func newInsertNode(replica ReplicaInterface) *insertNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -13,7 +13,7 @@ import (
type serviceTimeNode struct {
baseNode
replica collectionReplica
replica ReplicaInterface
timeTickMsgStream msgstream.MsgStream
}
......@@ -71,7 +71,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
return stNode.timeTickMsgStream.Produce(context.TODO(), &msgPack)
}
func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode {
func newServiceTimeNode(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *serviceTimeNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......
......@@ -26,7 +26,7 @@ import (
)
type indexLoader struct {
replica collectionReplica
replica ReplicaInterface
fieldIndexes map[string][]*internalpb2.IndexStats
fieldStatsChan chan []*internalpb2.FieldStats
......@@ -389,7 +389,7 @@ func (loader *indexLoader) loadIndexDelayed(collectionID, segmentID UniqueID, in
return nil
}
func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica collectionReplica) *indexLoader {
func newIndexLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface) *indexLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
......
......@@ -161,7 +161,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return nil
}
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *loadService {
ctx1, cancel := context.WithCancel(ctx)
segLoader := newSegmentLoader(ctx1, masterClient, indexClient, dataClient, replica, dmStream)
......
......@@ -27,10 +27,10 @@ const (
type metaService struct {
ctx context.Context
kvBase *etcdkv.EtcdKV
replica collectionReplica
replica ReplicaInterface
}
func newMetaService(ctx context.Context, replica collectionReplica) *metaService {
func newMetaService(ctx context.Context, replica ReplicaInterface) *metaService {
ETCDAddr := Params.ETCDAddress
MetaRootPath := Params.MetaRootPath
var cli *clientv3.Client
......
......@@ -55,7 +55,7 @@ type QueryNode struct {
QueryNodeID UniqueID
stateCode atomic.Value
replica collectionReplica
replica ReplicaInterface
// internal services
dataSyncService *dataSyncService
......@@ -88,7 +88,7 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F
msFactory: factory,
}
node.replica = newCollectionReplicaImpl()
node.replica = newCollectionReplica()
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return node
}
......@@ -107,7 +107,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
msFactory: factory,
}
node.replica = newCollectionReplicaImpl()
node.replica = newCollectionReplica()
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return node
......
......@@ -23,7 +23,7 @@ type searchService struct {
wait sync.WaitGroup
cancel context.CancelFunc
replica collectionReplica
replica ReplicaInterface
tSafeWatcher *tSafeWatcher
serviceableTimeMutex sync.Mutex // guards serviceableTime
......@@ -38,7 +38,7 @@ type searchService struct {
type ResultEntityIds []UniqueID
func newSearchService(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *searchService {
func newSearchService(ctx context.Context, replica ReplicaInterface, factory msgstream.Factory) *searchService {
receiveBufSize := Params.SearchReceiveBufSize
searchStream, _ := factory.NewMsgStream(ctx)
......
......@@ -17,7 +17,7 @@ import (
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
replica collectionReplica
replica ReplicaInterface
dmStream msgstream.MsgStream
......@@ -191,7 +191,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, targetField
return nil
}
func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *segmentLoader {
func newSegmentLoader(ctx context.Context, masterClient MasterServiceInterface, indexClient IndexServiceInterface, dataClient DataServiceInterface, replica ReplicaInterface, dmStream msgstream.MsgStream) *segmentLoader {
option := &minioKV.Option{
Address: Params.MinioEndPoint,
AccessKeyID: Params.MinioAccessKeyID,
......
......@@ -13,14 +13,14 @@ import (
type statsService struct {
ctx context.Context
replica collectionReplica
replica ReplicaInterface
fieldStatsChan chan []*internalpb2.FieldStats
statsStream msgstream.MsgStream
msFactory msgstream.Factory
}
func newStatsService(ctx context.Context, replica collectionReplica, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService {
func newStatsService(ctx context.Context, replica ReplicaInterface, fieldStatsChan chan []*internalpb2.FieldStats, factory msgstream.Factory) *statsService {
return &statsService{
ctx: ctx,
......
......@@ -24,38 +24,38 @@ func (watcher *tSafeWatcher) hasUpdate() {
<-watcher.notifyChan
}
type tSafe interface {
type tSafer interface {
get() Timestamp
set(t Timestamp)
registerTSafeWatcher(t *tSafeWatcher)
}
type tSafeImpl struct {
type tSafe struct {
tSafeMu sync.Mutex // guards all fields
tSafe Timestamp
watcherList []*tSafeWatcher
}
func newTSafe() tSafe {
var t tSafe = &tSafeImpl{
func newTSafe() tSafer {
var t tSafer = &tSafe{
watcherList: make([]*tSafeWatcher, 0),
}
return t
}
func (ts *tSafeImpl) registerTSafeWatcher(t *tSafeWatcher) {
func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
ts.watcherList = append(ts.watcherList, t)
}
func (ts *tSafeImpl) get() Timestamp {
func (ts *tSafe) get() Timestamp {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
return ts.tSafe
}
func (ts *tSafeImpl) set(t Timestamp) {
func (ts *tSafe) set(t Timestamp) {
ts.tSafeMu.Lock()
defer ts.tSafeMu.Unlock()
......
......@@ -9,7 +9,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type metaReplica interface {
type Replica interface {
getCollections(dbID UniqueID) ([]*collection, error)
getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error)
getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error)
......@@ -42,23 +42,23 @@ type collection struct {
schema *schemapb.CollectionSchema
}
type metaReplicaImpl struct {
type metaReplica struct {
dbID []UniqueID
db2collections map[UniqueID][]*collection
}
func newMetaReplica() metaReplica {
func newMetaReplica() Replica {
db2collections := make(map[UniqueID][]*collection)
db2collections[0] = make([]*collection, 0)
dbIDs := make([]UniqueID, 0)
dbIDs = append(dbIDs, UniqueID(0))
return &metaReplicaImpl{
return &metaReplica{
dbID: dbIDs,
db2collections: db2collections,
}
}
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
func (mp *metaReplica) addCollection(dbID UniqueID, collectionID UniqueID, schema *schemapb.CollectionSchema) error {
//TODO:: assert dbID = 0 exist
if _, ok := mp.db2collections[dbID]; ok {
partitions := make(map[UniqueID]*partition)
......@@ -76,7 +76,7 @@ func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID, s
return errors.New("addCollection: can't find dbID when add collection")
}
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
func (mp *metaReplica) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collection.id == collectionID {
......@@ -95,7 +95,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
return errors.New("addPartition: can't find collection when add partition")
}
func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
func (mp *metaReplica) getCollections(dbID UniqueID) ([]*collection, error) {
if collections, ok := mp.db2collections[dbID]; ok {
return collections, nil
}
......@@ -103,7 +103,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error)
return nil, errors.New("getCollections: can't find collectionID")
}
func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
func (mp *metaReplica) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -119,7 +119,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) (
return nil, errors.New("getPartitions: can't find partitionIDs")
}
func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
func (mp *metaReplica) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -136,7 +136,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
return nil, errors.New("getSegments: can't find segmentID")
}
func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) {
func (mp *metaReplica) getCollectionByID(dbID UniqueID, collectionID UniqueID) (*collection, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -148,7 +148,7 @@ func (mp *metaReplicaImpl) getCollectionByID(dbID UniqueID, collectionID UniqueI
return nil, errors.New("getCollectionByID: can't find collectionID")
}
func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
func (mp *metaReplica) getPartitionByID(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -163,7 +163,7 @@ func (mp *metaReplicaImpl) getPartitionByID(dbID UniqueID, collectionID UniqueID
return nil, errors.New("getPartitionByID: can't find partitionID")
}
func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
func (mp *metaReplica) updatePartitionState(dbID UniqueID,
collectionID UniqueID,
partitionID UniqueID,
state querypb.PartitionState) error {
......@@ -178,7 +178,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
return errors.New("updatePartitionState: update partition state fail")
}
func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
func (mp *metaReplica) getPartitionStates(dbID UniqueID,
collectionID UniqueID,
partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) {
partitionStates := make([]*querypb.PartitionStates, 0)
......@@ -202,7 +202,7 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
return partitionStates, nil
}
func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
func (mp *metaReplica) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for i, collection := range collections {
if collectionID == collection.id {
......@@ -220,7 +220,7 @@ func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueI
return errors.New(errorStr)
}
func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
func (mp *metaReplica) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -236,7 +236,7 @@ func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID
return errors.New(errorStr)
}
func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error {
func (mp *metaReplica) addDmChannels(dbID UniqueID, collectionID UniqueID, channels2NodeID map[string]int64) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......@@ -250,7 +250,7 @@ func (mp *metaReplicaImpl) addDmChannels(dbID UniqueID, collectionID UniqueID, c
return errors.New("addDmChannels: can't find dbID or collectionID")
}
func (mp *metaReplicaImpl) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) {
func (mp *metaReplica) getAssignedNodeIDByChannelName(dbID UniqueID, collectionID UniqueID, channel string) (int64, error) {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
......
......@@ -25,8 +25,24 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Service interface {
typeutil.Component
RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error)
LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error)
LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error)
ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error)
CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error)
GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error)
}
type MasterServiceInterface interface {
ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
......@@ -60,7 +76,7 @@ type QueryService struct {
loopCancel context.CancelFunc
queryServiceID uint64
replica metaReplica
replica Replica
dataServiceClient DataServiceInterface
masterServiceClient MasterServiceInterface
......
......@@ -10,6 +10,9 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type TimeTickHandler interface {
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
}
type Service interface {
Init() error
Start() error
......@@ -18,7 +21,6 @@ type Service interface {
type Component interface {
GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error)
GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册