diff --git a/internal/proxynode/insert_channels.go b/internal/proxynode/insert_channels.go index deda7ef03c957858ebf3a92ad92c1d894d22aa3c..90451614bdfd208157da1fe81409fc869d943b0c 100644 --- a/internal/proxynode/insert_channels.go +++ b/internal/proxynode/insert_channels.go @@ -134,12 +134,10 @@ func (m *InsertChannelsMap) closeInsertMsgStream(collID UniqueID) error { m.usageHistogram[loc]-- if m.usageHistogram[loc] <= 0 { m.insertMsgStreams[loc].Close() + m.droppedBitMap[loc] = 1 + delete(m.collectionID2InsertChannels, collID) + log.Print("close insert message stream ...") } - log.Print("close insert message stream ...") - - m.droppedBitMap[loc] = 1 - delete(m.collectionID2InsertChannels, collID) - return nil } diff --git a/internal/querynode/collection.go b/internal/querynode/collection.go index bd53ffbd01631f0dd618bd26a2e76e1d682d9510..47ce52039d9f28bfe450b9d44a85c4a9bf9068ea 100644 --- a/internal/querynode/collection.go +++ b/internal/querynode/collection.go @@ -19,22 +19,32 @@ import ( type Collection struct { collectionPtr C.CCollection id UniqueID + partitionIDs []UniqueID schema *schemapb.CollectionSchema - partitions []*Partition } func (c *Collection) ID() UniqueID { return c.id } -func (c *Collection) Partitions() *[]*Partition { - return &c.partitions -} - func (c *Collection) Schema() *schemapb.CollectionSchema { return c.schema } +func (c *Collection) addPartitionID(partitionID UniqueID) { + c.partitionIDs = append(c.partitionIDs, partitionID) +} + +func (c *Collection) removePartitionID(partitionID UniqueID) { + tmpIDs := make([]UniqueID, 0) + for _, id := range c.partitionIDs { + if id == partitionID { + tmpIDs = append(tmpIDs, id) + } + } + c.partitionIDs = tmpIDs +} + func newCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) *Collection { /* CCollection diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index dd1a80fd3461b15632edf1bb13cb9a60bcce51ad..1cd007902016ca804e4ae7738df6602ace89a012 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -12,13 +12,10 @@ package querynode */ import "C" import ( - "fmt" - "log" "strconv" "sync" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) @@ -32,69 +29,63 @@ import ( * is up-to-date. */ type collectionReplica interface { - getTSafe() tSafe - // collection - getCollectionNum() int addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error removeCollection(collectionID UniqueID) error getCollectionByID(collectionID UniqueID) (*Collection, error) hasCollection(collectionID UniqueID) bool + getCollectionNum() int + getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) + getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) // partition - // TODO: remove collection ID, add a `map[partitionID]partition` to replica implement - getPartitionNum(collectionID UniqueID) (int, error) addPartition(collectionID UniqueID, partitionID UniqueID) error - removePartition(collectionID UniqueID, partitionID UniqueID) error - addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error - removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error - getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) - hasPartition(collectionID UniqueID, partitionID UniqueID) bool - enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error - disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error - getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) + removePartition(partitionID UniqueID) error + getPartitionByID(partitionID UniqueID) (*Partition, error) + hasPartition(partitionID UniqueID) bool + getPartitionNum() int + getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) + + enablePartitionDM(partitionID UniqueID) error + disablePartitionDM(partitionID UniqueID) error + getEnablePartitionDM(partitionID UniqueID) (bool, error) // segment - getSegmentNum() int - getSegmentStatistics() []*internalpb2.SegmentStats addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error removeSegment(segmentID UniqueID) error getSegmentByID(segmentID UniqueID) (*Segment, error) hasSegment(segmentID UniqueID) bool + getSegmentNum() int + + getSegmentStatistics() []*internalpb2.SegmentStats getSealedSegments() ([]UniqueID, []UniqueID) replaceGrowingSegmentBySealedSegment(segment *Segment) error + getTSafe() tSafe freeAll() } type collectionReplicaImpl struct { tSafe tSafe - mu sync.RWMutex // guards collections and segments - collections []*Collection + mu sync.RWMutex // guards all + collections map[UniqueID]*Collection + partitions map[UniqueID]*Partition segments map[UniqueID]*Segment } -//----------------------------------------------------------------------------------------------------- tSafe -func (colReplica *collectionReplicaImpl) getTSafe() tSafe { - return colReplica.tSafe -} - //----------------------------------------------------------------------------------------------------- collection -func (colReplica *collectionReplicaImpl) getCollectionNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - - return len(colReplica.collections) -} - func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, schema *schemapb.CollectionSchema) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() + if ok := colReplica.hasCollectionPrivate(collectionID); ok { + return errors.New("collection has been existed, id = " + strconv.FormatInt(collectionID, 10)) + } + var newCollection = newCollection(collectionID, schema) - colReplica.collections = append(colReplica.collections, newCollection) + colReplica.collections[collectionID] = newCollection return nil } @@ -102,72 +93,82 @@ func (colReplica *collectionReplicaImpl) addCollection(collectionID UniqueID, sc func (colReplica *collectionReplicaImpl) removeCollection(collectionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() + return colReplica.removeCollectionPrivate(collectionID) +} +func (colReplica *collectionReplicaImpl) removeCollectionPrivate(collectionID UniqueID) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err } deleteCollection(collection) + delete(colReplica.collections, collectionID) - tmpCollections := make([]*Collection, 0) - for _, col := range colReplica.collections { - if col.ID() == collectionID { - for _, p := range *col.Partitions() { - for _, s := range *p.Segments() { - deleteSegment(colReplica.segments[s.ID()]) - delete(colReplica.segments, s.ID()) - } - } - } else { - tmpCollections = append(tmpCollections, col) - } + // delete partitions + for _, partitionID := range collection.partitionIDs { + // ignore error, try to delete + _ = colReplica.removePartitionPrivate(partitionID) } - colReplica.collections = tmpCollections return nil } func (colReplica *collectionReplicaImpl) getCollectionByID(collectionID UniqueID) (*Collection, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - return colReplica.getCollectionByIDPrivate(collectionID) } func (colReplica *collectionReplicaImpl) getCollectionByIDPrivate(collectionID UniqueID) (*Collection, error) { - for _, collection := range colReplica.collections { - if collection.ID() == collectionID { - return collection, nil - } + collection, ok := colReplica.collections[collectionID] + if !ok { + return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10)) } - return nil, errors.New("cannot find collection, id = " + strconv.FormatInt(collectionID, 10)) + return collection, nil } func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() + return colReplica.hasCollectionPrivate(collectionID) +} - for _, col := range colReplica.collections { - if col.ID() == collectionID { - return true - } +func (colReplica *collectionReplicaImpl) hasCollectionPrivate(collectionID UniqueID) bool { + _, ok := colReplica.collections[collectionID] + return ok +} + +func (colReplica *collectionReplicaImpl) getCollectionNum() int { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + return len(colReplica.collections) +} + +func (colReplica *collectionReplicaImpl) getPartitionIDs(collectionID UniqueID) ([]UniqueID, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + collection, err := colReplica.getCollectionByIDPrivate(collectionID) + if err != nil { + return nil, err } - return false + + return collection.partitionIDs, nil } func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - col, err := colReplica.getCollectionByIDPrivate(collectionID) + collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return nil, err } vecFields := make([]int64, 0) - for _, field := range col.Schema().Fields { + for _, field := range collection.Schema().Fields { if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT { vecFields = append(vecFields, field.FieldID) } @@ -181,165 +182,100 @@ func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID } //----------------------------------------------------------------------------------------------------- partition -func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) (int, error) { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - return -1, err - } - - return len(collection.partitions), nil -} - func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() + return colReplica.addPartitionPrivate(collectionID, partitionID) +} +func (colReplica *collectionReplicaImpl) addPartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err } - var newPartition = newPartition(partitionID) - - *collection.Partitions() = append(*collection.Partitions(), newPartition) + collection.addPartitionID(partitionID) + var newPartition = newPartition(collectionID, partitionID) + colReplica.partitions[partitionID] = newPartition return nil } -func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, partitionID UniqueID) error { +func (colReplica *collectionReplicaImpl) removePartition(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - - return colReplica.removePartitionPrivate(collectionID, partitionID) + return colReplica.removePartitionPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) removePartitionPrivate(collectionID UniqueID, partitionID UniqueID) error { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) +func (colReplica *collectionReplicaImpl) removePartitionPrivate(partitionID UniqueID) error { + partition, err := colReplica.getPartitionByIDPrivate(partitionID) if err != nil { return err } - var tmpPartitions = make([]*Partition, 0) - for _, p := range *collection.Partitions() { - if p.ID() == partitionID { - for _, s := range *p.Segments() { - deleteSegment(colReplica.segments[s.ID()]) - delete(colReplica.segments, s.ID()) - } - } else { - tmpPartitions = append(tmpPartitions, p) - } - } - - *collection.Partitions() = tmpPartitions - return nil -} - -// deprecated -func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error { - if !colReplica.hasCollection(colMeta.ID) { - err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10)) - return err - } - pToAdd := make([]UniqueID, 0) - for _, partitionID := range colMeta.PartitionIDs { - if !colReplica.hasPartition(colMeta.ID, partitionID) { - pToAdd = append(pToAdd, partitionID) - } - } - - for _, id := range pToAdd { - err := colReplica.addPartition(colMeta.ID, id) - if err != nil { - log.Println(err) - } - fmt.Println("add partition: ", id) - } - - return nil -} - -func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionInfo) error { - colReplica.mu.Lock() - defer colReplica.mu.Unlock() - - col, err := colReplica.getCollectionByIDPrivate(colMeta.ID) + collection, err := colReplica.getCollectionByIDPrivate(partition.collectionID) if err != nil { return err } - pToDel := make([]UniqueID, 0) - for _, partition := range col.partitions { - hasPartition := false - for _, id := range colMeta.PartitionIDs { - if partition.ID() == id { - hasPartition = true - } - } - if !hasPartition { - pToDel = append(pToDel, partition.ID()) - } - } + collection.removePartitionID(partitionID) + delete(colReplica.partitions, partitionID) - for _, id := range pToDel { - err := colReplica.removePartitionPrivate(col.ID(), id) - if err != nil { - log.Println(err) - } - fmt.Println("delete partition: ", id) + // delete segments + for _, segmentID := range partition.segmentIDs { + // try to delete, ignore error + _ = colReplica.removeSegmentPrivate(segmentID) } - return nil } -func (colReplica *collectionReplicaImpl) getPartitionByID(collectionID UniqueID, partitionID UniqueID) (*Partition, error) { +func (colReplica *collectionReplicaImpl) getPartitionByID(partitionID UniqueID) (*Partition, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - - return colReplica.getPartitionByIDPrivate(collectionID, partitionID) + return colReplica.getPartitionByIDPrivate(partitionID) } -func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(collectionID UniqueID, partitionID UniqueID) (*Partition, error) { - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - return nil, err +func (colReplica *collectionReplicaImpl) getPartitionByIDPrivate(partitionID UniqueID) (*Partition, error) { + partition, ok := colReplica.partitions[partitionID] + if !ok { + return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10)) } - for _, p := range *collection.Partitions() { - if p.ID() == partitionID { - return p, nil - } - } + return partition, nil +} + +func (colReplica *collectionReplicaImpl) hasPartition(partitionID UniqueID) bool { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + return colReplica.hasPartitionPrivate(partitionID) +} - return nil, errors.New("cannot find partition, id = " + strconv.FormatInt(partitionID, 10)) +func (colReplica *collectionReplicaImpl) hasPartitionPrivate(partitionID UniqueID) bool { + _, ok := colReplica.partitions[partitionID] + return ok } -func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionID UniqueID) bool { +func (colReplica *collectionReplicaImpl) getPartitionNum() int { colReplica.mu.RLock() defer colReplica.mu.RUnlock() + return len(colReplica.partitions) +} - collection, err := colReplica.getCollectionByIDPrivate(collectionID) - if err != nil { - log.Println(err) - return false - } +func (colReplica *collectionReplicaImpl) getSegmentIDs(partitionID UniqueID) ([]UniqueID, error) { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() - for _, p := range *collection.Partitions() { - if p.ID() == partitionID { - return true - } + partition, err2 := colReplica.getPartitionByIDPrivate(partitionID) + if err2 != nil { + return nil, err2 } - - return false + return partition.segmentIDs, nil } -func (colReplica *collectionReplicaImpl) enablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { +func (colReplica *collectionReplicaImpl) enablePartitionDM(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + partition, err := colReplica.getPartitionByIDPrivate(partitionID) if err != nil { return err } @@ -348,11 +284,11 @@ func (colReplica *collectionReplicaImpl) enablePartitionDM(collectionID UniqueID return nil } -func (colReplica *collectionReplicaImpl) disablePartitionDM(collectionID UniqueID, partitionID UniqueID) error { +func (colReplica *collectionReplicaImpl) disablePartitionDM(partitionID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + partition, err := colReplica.getPartitionByIDPrivate(partitionID) if err != nil { return err } @@ -361,11 +297,11 @@ func (colReplica *collectionReplicaImpl) disablePartitionDM(collectionID UniqueI return nil } -func (colReplica *collectionReplicaImpl) getEnablePartitionDM(collectionID UniqueID, partitionID UniqueID) (bool, error) { +func (colReplica *collectionReplicaImpl) getEnablePartitionDM(partitionID UniqueID) (bool, error) { colReplica.mu.Lock() defer colReplica.mu.Unlock() - partition, err := colReplica.getPartitionByIDPrivate(collectionID, partitionID) + partition, err := colReplica.getPartitionByIDPrivate(partitionID) if err != nil { return false, err } @@ -373,56 +309,26 @@ func (colReplica *collectionReplicaImpl) getEnablePartitionDM(collectionID Uniqu } //----------------------------------------------------------------------------------------------------- segment -func (colReplica *collectionReplicaImpl) getSegmentNum() int { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - - return len(colReplica.segments) -} - -func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats { - colReplica.mu.RLock() - defer colReplica.mu.RUnlock() - - var statisticData = make([]*internalpb2.SegmentStats, 0) - - for segmentID, segment := range colReplica.segments { - currentMemSize := segment.getMemSize() - segment.lastMemSize = currentMemSize - segmentNumOfRows := segment.getRowCount() - - stat := internalpb2.SegmentStats{ - SegmentID: segmentID, - MemorySize: currentMemSize, - NumRows: segmentNumOfRows, - RecentlyModified: segment.getRecentlyModified(), - } - - statisticData = append(statisticData, &stat) - segment.setRecentlyModified(false) - } - - return statisticData -} - func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() + return colReplica.addSegmentPrivate(segmentID, partitionID, collectionID, segType) +} +func (colReplica *collectionReplicaImpl) addSegmentPrivate(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, segType segmentType) error { collection, err := colReplica.getCollectionByIDPrivate(collectionID) if err != nil { return err } - partition, err2 := colReplica.getPartitionByIDPrivate(collectionID, partitionID) - if err2 != nil { - return err2 + partition, err := colReplica.getPartitionByIDPrivate(partitionID) + if err != nil { + return err } + partition.addSegmentID(segmentID) var newSegment = newSegment(collection, segmentID, partitionID, collectionID, segType) - colReplica.segments[segmentID] = newSegment - *partition.Segments() = append(*partition.Segments(), newSegment) return nil } @@ -430,61 +336,84 @@ func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, partitio func (colReplica *collectionReplicaImpl) removeSegment(segmentID UniqueID) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - return colReplica.removeSegmentPrivate(segmentID) } func (colReplica *collectionReplicaImpl) removeSegmentPrivate(segmentID UniqueID) error { - var targetPartition *Partition - var segmentIndex = -1 - - for _, col := range colReplica.collections { - for _, p := range *col.Partitions() { - for i, s := range *p.Segments() { - if s.ID() == segmentID { - targetPartition = p - segmentIndex = i - deleteSegment(colReplica.segments[s.ID()]) - } - } - } + segment, err := colReplica.getSegmentByIDPrivate(segmentID) + if err != nil { + return err } - delete(colReplica.segments, segmentID) - - if targetPartition != nil && segmentIndex > 0 { - targetPartition.segments = append(targetPartition.segments[:segmentIndex], targetPartition.segments[segmentIndex+1:]...) + partition, err := colReplica.getPartitionByIDPrivate(segment.partitionID) + if err != nil { + return err } + partition.removeSegmentID(segmentID) + delete(colReplica.segments, segmentID) + deleteSegment(segment) + return nil } func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Segment, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() - return colReplica.getSegmentByIDPrivate(segmentID) } func (colReplica *collectionReplicaImpl) getSegmentByIDPrivate(segmentID UniqueID) (*Segment, error) { - targetSegment, ok := colReplica.segments[segmentID] - + segment, ok := colReplica.segments[segmentID] if !ok { - return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) + return nil, errors.New("cannot find segment, id = " + strconv.FormatInt(segmentID, 10)) } - return targetSegment, nil + return segment, nil } func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { colReplica.mu.RLock() defer colReplica.mu.RUnlock() + return colReplica.hasSegmentPrivate(segmentID) +} +func (colReplica *collectionReplicaImpl) hasSegmentPrivate(segmentID UniqueID) bool { _, ok := colReplica.segments[segmentID] - return ok } +func (colReplica *collectionReplicaImpl) getSegmentNum() int { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + return len(colReplica.segments) +} + +func (colReplica *collectionReplicaImpl) getSegmentStatistics() []*internalpb2.SegmentStats { + colReplica.mu.RLock() + defer colReplica.mu.RUnlock() + + var statisticData = make([]*internalpb2.SegmentStats, 0) + + for segmentID, segment := range colReplica.segments { + currentMemSize := segment.getMemSize() + segment.lastMemSize = currentMemSize + segmentNumOfRows := segment.getRowCount() + + stat := internalpb2.SegmentStats{ + SegmentID: segmentID, + MemorySize: currentMemSize, + NumRows: segmentNumOfRows, + RecentlyModified: segment.getRecentlyModified(), + } + + statisticData = append(statisticData, &stat) + segment.setRecentlyModified(false) + } + + return statisticData +} + func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []UniqueID) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -504,37 +433,54 @@ func (colReplica *collectionReplicaImpl) getSealedSegments() ([]UniqueID, []Uniq func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error { colReplica.mu.Lock() defer colReplica.mu.Unlock() - targetSegment, ok := colReplica.segments[segment.ID()] - if ok { + if segment.segmentType != segTypeSealed && segment.segmentType != segTypeIndexing { + return errors.New("unexpected segment type") + } + targetSegment, err := colReplica.getSegmentByIDPrivate(segment.ID()) + if err != nil && targetSegment != nil { if targetSegment.segmentType != segTypeGrowing { + // target segment has been a sealed segment return nil } deleteSegment(targetSegment) - targetSegment = segment - } else { - // add segment - targetPartition, err := colReplica.getPartitionByIDPrivate(segment.collectionID, segment.partitionID) - if err != nil { - return err - } - targetPartition.segments = append(targetPartition.segments, segment) - colReplica.segments[segment.ID()] = segment } + + targetSegment = segment return nil } //----------------------------------------------------------------------------------------------------- +func (colReplica *collectionReplicaImpl) getTSafe() tSafe { + return colReplica.tSafe +} + func (colReplica *collectionReplicaImpl) freeAll() { colReplica.mu.Lock() defer colReplica.mu.Unlock() - for _, seg := range colReplica.segments { - deleteSegment(seg) - } - for _, col := range colReplica.collections { - deleteCollection(col) + for id := range colReplica.collections { + _ = colReplica.removeCollectionPrivate(id) } + colReplica.collections = make(map[UniqueID]*Collection) + colReplica.partitions = make(map[UniqueID]*Partition) colReplica.segments = make(map[UniqueID]*Segment) - colReplica.collections = make([]*Collection, 0) +} + +func newCollectionReplicaImpl() collectionReplica { + collections := make(map[int64]*Collection) + partitions := make(map[int64]*Partition) + segments := make(map[int64]*Segment) + + tSafe := newTSafe() + + var replica collectionReplica = &collectionReplicaImpl{ + collections: collections, + partitions: partitions, + segments: segments, + + tSafe: tSafe, + } + + return replica } diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index 521a1060d3c467a1f8c67f394ba55875d615438a..56e8c28c3ea461aa8575af9d7e459b95070be2bc 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -11,13 +11,15 @@ func TestCollectionReplica_getCollectionNum(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) assert.Equal(t, node.replica.getCollectionNum(), 1) - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_addCollection(t *testing.T) { node := newQueryNodeMock() initTestMeta(t, node, 0, 0) - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_removeCollection(t *testing.T) { @@ -28,7 +30,8 @@ func TestCollectionReplica_removeCollection(t *testing.T) { err := node.replica.removeCollection(0) assert.NoError(t, err) assert.Equal(t, node.replica.getCollectionNum(), 0) - node.Stop() + err = node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_getCollectionByID(t *testing.T) { @@ -39,7 +42,8 @@ func TestCollectionReplica_getCollectionByID(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, targetCollection) assert.Equal(t, targetCollection.ID(), collectionID) - node.Stop() + err = node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_hasCollection(t *testing.T) { @@ -52,7 +56,8 @@ func TestCollectionReplica_hasCollection(t *testing.T) { hasCollection = node.replica.hasCollection(UniqueID(1)) assert.Equal(t, hasCollection, false) - node.Stop() + err := node.Stop() + assert.NoError(t, err) } //----------------------------------------------------------------------------------------------------- partition @@ -65,15 +70,15 @@ func TestCollectionReplica_getPartitionNum(t *testing.T) { for _, id := range partitionIDs { err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(collectionID, id) + partition, err := node.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) } - partitionNum, err := node.replica.getPartitionNum(collectionID) + partitionNum := node.replica.getPartitionNum() + assert.Equal(t, partitionNum, len(partitionIDs)+1) + err := node.Stop() assert.NoError(t, err) - assert.Equal(t, partitionNum, len(partitionIDs)+1) // _default - node.Stop() } func TestCollectionReplica_addPartition(t *testing.T) { @@ -85,11 +90,12 @@ func TestCollectionReplica_addPartition(t *testing.T) { for _, id := range partitionIDs { err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(collectionID, id) + partition, err := node.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_removePartition(t *testing.T) { @@ -102,60 +108,14 @@ func TestCollectionReplica_removePartition(t *testing.T) { for _, id := range partitionIDs { err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(collectionID, id) + partition, err := node.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) - err = node.replica.removePartition(collectionID, id) + err = node.replica.removePartition(id) assert.NoError(t, err) } - node.Stop() -} - -func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - - collectionMeta := genTestCollectionMeta(collectionID, false) - collectionMeta.PartitionIDs = []UniqueID{0, 1, 2} - - err := node.replica.addPartitionsByCollectionMeta(collectionMeta) + err := node.Stop() assert.NoError(t, err) - partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1) - hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0)) - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1)) - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2)) - assert.Equal(t, hasPartition, true) - - node.Stop() -} - -func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - - collectionMeta := genTestCollectionMeta(collectionID, false) - collectionMeta.PartitionIDs = []UniqueID{0} - - err := node.replica.addPartitionsByCollectionMeta(collectionMeta) - assert.NoError(t, err) - partitionNum, err := node.replica.getPartitionNum(UniqueID(0)) - assert.NoError(t, err) - assert.Equal(t, partitionNum, len(collectionMeta.PartitionIDs)+1) - - hasPartition := node.replica.hasPartition(UniqueID(0), UniqueID(0)) - assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(1)) - assert.Equal(t, hasPartition, false) - hasPartition = node.replica.hasPartition(UniqueID(0), UniqueID(2)) - assert.Equal(t, hasPartition, false) - - node.Stop() } func TestCollectionReplica_getPartitionByTag(t *testing.T) { @@ -168,12 +128,13 @@ func TestCollectionReplica_getPartitionByTag(t *testing.T) { for _, id := range collectionMeta.PartitionIDs { err := node.replica.addPartition(collectionID, id) assert.NoError(t, err) - partition, err := node.replica.getPartitionByID(collectionID, id) + partition, err := node.replica.getPartitionByID(id) assert.NoError(t, err) assert.Equal(t, partition.ID(), id) assert.NotNil(t, partition) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_hasPartition(t *testing.T) { @@ -184,11 +145,12 @@ func TestCollectionReplica_hasPartition(t *testing.T) { collectionMeta := genTestCollectionMeta(collectionID, false) err := node.replica.addPartition(collectionID, collectionMeta.PartitionIDs[0]) assert.NoError(t, err) - hasPartition := node.replica.hasPartition(collectionID, defaultPartitionID) + hasPartition := node.replica.hasPartition(defaultPartitionID) assert.Equal(t, hasPartition, true) - hasPartition = node.replica.hasPartition(collectionID, defaultPartitionID+1) + hasPartition = node.replica.hasPartition(defaultPartitionID + 1) assert.Equal(t, hasPartition, false) - node.Stop() + err = node.Stop() + assert.NoError(t, err) } //----------------------------------------------------------------------------------------------------- segment @@ -206,7 +168,8 @@ func TestCollectionReplica_addSegment(t *testing.T) { assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_removeSegment(t *testing.T) { @@ -226,7 +189,8 @@ func TestCollectionReplica_removeSegment(t *testing.T) { assert.NoError(t, err) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_getSegmentByID(t *testing.T) { @@ -244,7 +208,8 @@ func TestCollectionReplica_getSegmentByID(t *testing.T) { assert.Equal(t, targetSeg.segmentID, UniqueID(i)) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_hasSegment(t *testing.T) { @@ -266,7 +231,8 @@ func TestCollectionReplica_hasSegment(t *testing.T) { assert.Equal(t, hasSeg, false) } - node.Stop() + err := node.Stop() + assert.NoError(t, err) } func TestCollectionReplica_freeAll(t *testing.T) { @@ -274,6 +240,7 @@ func TestCollectionReplica_freeAll(t *testing.T) { collectionID := UniqueID(0) initTestMeta(t, node, collectionID, 0) - node.Stop() + err := node.Stop() + assert.NoError(t, err) } diff --git a/internal/querynode/collection_test.go b/internal/querynode/collection_test.go index ad0970c7715f564e48b9d6e8d81440b0ae532ca3..bc624b989ce3efb1af82f616b2d03c5dc8bc3a50 100644 --- a/internal/querynode/collection_test.go +++ b/internal/querynode/collection_test.go @@ -6,18 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCollection_Partitions(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - - collection, err := node.replica.getCollectionByID(collectionID) - assert.NoError(t, err) - - partitions := collection.Partitions() - assert.Equal(t, 1, len(*partitions)) -} - func TestCollection_newCollection(t *testing.T) { collectionID := UniqueID(0) collectionMeta := genTestCollectionMeta(collectionID, false) diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index dc468ed079bcf63e1e4d2d6a841e9b5bc6f03236..2be079e4479f1d6df59f419fa60de2966a8aaed0 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -41,7 +41,7 @@ func (gcNode *gcNode) Operate(in []*Msg) []*Msg { // //// drop partitions //for _, partition := range gcMsg.gcRecord.partitions { - // err := gcNode.replica.removePartition(partition.collectionID, partition.partitionID) + // err := gcNode.replica.removePartition(partition.partitionID) // if err != nil { // log.Println(err) // } diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 199d6caa556cd76772de632ddfee19f770367a7a..ee6abe3ba2e9f580474d78db7ce5caa31d3bd1a8 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -13,23 +13,35 @@ package querynode import "C" type Partition struct { - id UniqueID - segments []*Segment - enableDM bool + collectionID UniqueID + partitionID UniqueID + segmentIDs []UniqueID + enableDM bool } func (p *Partition) ID() UniqueID { - return p.id + return p.partitionID } -func (p *Partition) Segments() *[]*Segment { - return &(*p).segments +func (p *Partition) addSegmentID(segmentID UniqueID) { + p.segmentIDs = append(p.segmentIDs, segmentID) } -func newPartition(partitionID UniqueID) *Partition { +func (p *Partition) removeSegmentID(segmentID UniqueID) { + tmpIDs := make([]UniqueID, 0) + for _, id := range p.segmentIDs { + if id == segmentID { + tmpIDs = append(tmpIDs, id) + } + } + p.segmentIDs = tmpIDs +} + +func newPartition(collectionID UniqueID, partitionID UniqueID) *Partition { var newPartition = &Partition{ - id: partitionID, - enableDM: false, + collectionID: collectionID, + partitionID: partitionID, + enableDM: false, } return newPartition diff --git a/internal/querynode/partition_test.go b/internal/querynode/partition_test.go index 328141e90b988b93bc438c16fb3b297bc92e01c2..9748afda23a9fc66121c5a37598b6da49854ac49 100644 --- a/internal/querynode/partition_test.go +++ b/internal/querynode/partition_test.go @@ -6,29 +6,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPartition_Segments(t *testing.T) { - node := newQueryNodeMock() - collectionID := UniqueID(0) - initTestMeta(t, node, collectionID, 0) - - collection, err := node.replica.getCollectionByID(collectionID) - assert.NoError(t, err) - - partitions := collection.Partitions() - targetPartition := (*partitions)[0] - - const segmentNum = 3 - for i := 0; i < segmentNum; i++ { - err := node.replica.addSegment(UniqueID(i), targetPartition.ID(), collection.ID(), segTypeGrowing) - assert.NoError(t, err) - } - - segments := targetPartition.Segments() - assert.Equal(t, segmentNum+1, len(*segments)) -} - func TestPartition_newPartition(t *testing.T) { partitionID := defaultPartitionID - partition := newPartition(partitionID) + partition := newPartition(UniqueID(0), partitionID) assert.Equal(t, partition.ID(), defaultPartitionID) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 2be99ad9530b9d8d435434bc985cf606e95b2be1..d55b47ba74063ab29cab5b5ca18fe1a0fcc6c370 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -81,17 +81,7 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode { statsService: nil, } - segmentsMap := make(map[int64]*Segment) - collections := make([]*Collection, 0) - - tSafe := newTSafe() - - node.replica = &collectionReplicaImpl{ - collections: collections, - segments: segmentsMap, - - tSafe: tSafe, - } + node.replica = newCollectionReplicaImpl() node.stateCode.Store(internalpb2.StateCode_INITIALIZING) return node } @@ -108,17 +98,7 @@ func NewQueryNodeWithoutID(ctx context.Context) *QueryNode { statsService: nil, } - segmentsMap := make(map[int64]*Segment) - collections := make([]*Collection, 0) - - tSafe := newTSafe() - - node.replica = &collectionReplicaImpl{ - collections: collections, - segments: segmentsMap, - - tSafe: tSafe, - } + node.replica = newCollectionReplicaImpl() node.stateCode.Store(internalpb2.StateCode_INITIALIZING) return node } @@ -403,7 +383,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S segmentIDs := in.SegmentIDs fieldIDs := in.FieldIDs - err := node.replica.enablePartitionDM(collectionID, partitionID) + err := node.replica.enablePartitionDM(partitionID) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -444,7 +424,7 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) { for _, id := range in.PartitionIDs { - err := node.replica.enablePartitionDM(in.CollectionID, id) + err := node.replica.enablePartitionDM(id) if err != nil { status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 5b2a8210363ab843126503982c3c1853b71999e6..939ed0917b49b7982e21e24ce480d4abe743e788 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -18,7 +18,7 @@ import ( ) const ctxTimeInMillisecond = 5000 -const closeWithDeadline = true +const debug = false const defaultPartitionID = UniqueID(2021) @@ -121,7 +121,9 @@ func newQueryNodeMock() *QueryNode { var ctx context.Context - if closeWithDeadline { + if debug { + ctx = context.Background() + } else { var cancel context.CancelFunc d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel = context.WithDeadline(context.Background(), d) @@ -129,8 +131,6 @@ func newQueryNodeMock() *QueryNode { <-ctx.Done() cancel() }() - } else { - ctx = context.Background() } svr := NewQueryNode(ctx, 0) diff --git a/internal/querynode/search_service.go b/internal/querynode/search_service.go index b663d9e9071c2ba268c415e911de7f86aeb1fe1e..f995935f7d90f639413b391651b12312dc330c46 100644 --- a/internal/querynode/search_service.go +++ b/internal/querynode/search_service.go @@ -245,11 +245,9 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { matchedSegments := make([]*Segment, 0) //fmt.Println("search msg's partitionID = ", partitionIDsInQuery) - - var partitionIDsInCol []UniqueID - for _, partition := range collection.partitions { - partitionID := partition.ID() - partitionIDsInCol = append(partitionIDsInCol, partitionID) + partitionIDsInCol, err := ss.replica.getPartitionIDs(collectionID) + if err != nil { + return err } var searchPartitionIDs []UniqueID partitionIDsInQuery := searchMsg.PartitionIDs @@ -267,10 +265,16 @@ func (ss *searchService) search(msg msgstream.TsMsg) error { } for _, partitionID := range searchPartitionIDs { - partition, _ := ss.replica.getPartitionByID(collectionID, partitionID) - for _, segment := range partition.segments { + segmentIDs, err := ss.replica.getSegmentIDs(partitionID) + if err != nil { + return err + } + for _, segmentID := range segmentIDs { //fmt.Println("dsl = ", dsl) - + segment, err := ss.replica.getSegmentByID(segmentID) + if err != nil { + return err + } searchResult, err := segment.segmentSearch(plan, placeholderGroups, []Timestamp{searchTimestamp}) if err != nil {