From fc74a0f57822ecbeef8dbf91e18e64f98f6702bf Mon Sep 17 00:00:00 2001 From: sunby Date: Fri, 23 Jul 2021 21:58:33 +0800 Subject: [PATCH] Auto create new segments when allocating rows more than (#6665) max number of rows per segment If user insert too much rows in a request. Now we will return a failed response. Maybe auto creating new segments to hold that much rows is a better way. issue: #6664 Signed-off-by: sunby --- internal/datacoord/grpc_services.go | 54 +++------ .../datacoord/segment_allocation_policy.go | 50 ++++++-- internal/datacoord/segment_info.go | 2 +- internal/datacoord/segment_manager.go | 109 +++++++----------- internal/datacoord/segment_manager_test.go | 65 ++++++----- internal/datacoord/server.go | 3 +- internal/datacoord/server_test.go | 86 +++++++------- 7 files changed, 187 insertions(+), 182 deletions(-) diff --git a/internal/datacoord/grpc_services.go b/internal/datacoord/grpc_services.go index 70c7b1377..3183da6e3 100644 --- a/internal/datacoord/grpc_services.go +++ b/internal/datacoord/grpc_services.go @@ -80,15 +80,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests)) - var appendFailedAssignment = func(err string) { - assigns = append(assigns, &datapb.SegmentIDAssignment{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err, - }, - }) - } - for _, r := range req.SegmentIDRequests { log.Debug("Handle assign segment request", zap.Int64("collectionID", r.GetCollectionID()), @@ -98,46 +89,39 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI if coll := s.meta.GetCollection(r.CollectionID); coll == nil { if err := s.loadCollectionFromRootCoord(ctx, r.CollectionID); err != nil { - errMsg := fmt.Sprintf("Can not load collection %d", r.CollectionID) - appendFailedAssignment(errMsg) log.Error("load collection from rootcoord error", zap.Int64("collectionID", r.CollectionID), zap.Error(err)) continue } } - //if err := s.validateAllocRequest(r.CollectionID, r.PartitionID, r.ChannelName); err != nil { - //result.Status.Reason = err.Error() - //assigns = append(assigns, result) - //continue - //} + s.cluster.Watch(r.ChannelName, r.CollectionID) - segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx, + allocations, err := s.segmentManager.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count)) if err != nil { - errMsg := fmt.Sprintf("Allocation of collection %d, partition %d, channel %s, count %d error: %s", - r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error()) - appendFailedAssignment(errMsg) + log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err)) continue } - log.Debug("Assign segment success", zap.Int64("segmentID", segmentID), - zap.Uint64("expireTs", expireTs)) - - result := &datapb.SegmentIDAssignment{ - SegID: segmentID, - ChannelName: r.ChannelName, - Count: uint32(retCount), - CollectionID: r.CollectionID, - PartitionID: r.PartitionID, - ExpireTime: expireTs, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, + log.Debug("Assign segment success", zap.Any("assignments", allocations)) + + for _, allocation := range allocations { + result := &datapb.SegmentIDAssignment{ + SegID: allocation.SegmentID, + ChannelName: r.ChannelName, + Count: uint32(allocation.NumOfRows), + CollectionID: r.CollectionID, + PartitionID: r.PartitionID, + ExpireTime: allocation.ExpireTime, + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + } + assigns = append(assigns, result) } - assigns = append(assigns, result) } return &datapb.AssignSegmentIDResponse{ Status: &commonpb.Status{ diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 90cba8124..7d92c1662 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -30,15 +30,49 @@ func calBySchemaPolicy(schema *schemapb.CollectionSchema) (int, error) { return int(threshold / float64(sizePerRecord)), nil } -type allocatePolicy func(segment *SegmentInfo, count int64) bool +type AllocatePolicy func(segments []*SegmentInfo, count int64, + maxCountPerSegment int64) ([]*Allocation, []*Allocation) + +func AllocatePolicyV1(segments []*SegmentInfo, count int64, + maxCountPerSegment int64) ([]*Allocation, []*Allocation) { + newSegmentAllocations := make([]*Allocation, 0) + existedSegmentAllocations := make([]*Allocation, 0) + // create new segment if count >= max num + for count >= maxCountPerSegment { + allocation := &Allocation{ + NumOfRows: maxCountPerSegment, + } + newSegmentAllocations = append(newSegmentAllocations, allocation) + count -= maxCountPerSegment + } + + // allocate space for remaining count + if count == 0 { + return newSegmentAllocations, existedSegmentAllocations + } + for _, segment := range segments { + var allocSize int64 + for _, allocation := range segment.allocations { + allocSize += allocation.NumOfRows + } + free := segment.GetMaxRowNum() - segment.GetNumOfRows() - allocSize + if free < count { + continue + } + allocation := &Allocation{ + SegmentID: segment.GetID(), + NumOfRows: count, + } + existedSegmentAllocations = append(existedSegmentAllocations, allocation) + return newSegmentAllocations, existedSegmentAllocations + } -func allocatePolicyV1(segment *SegmentInfo, count int64) bool { - var allocSize int64 - for _, allocation := range segment.allocations { - allocSize += allocation.numOfRows + // allocate new segment for remaining count + allocation := &Allocation{ + NumOfRows: count, } - free := segment.GetMaxRowNum() - segment.GetNumOfRows() - allocSize - return free >= count + newSegmentAllocations = append(newSegmentAllocations, allocation) + return newSegmentAllocations, existedSegmentAllocations } type sealPolicy func(maxCount, writtenCount, allocatedCount int64) bool @@ -54,7 +88,7 @@ func getSegmentCapacityPolicy(sizeFactor float64) segmentSealPolicy { return func(segment *SegmentInfo, ts Timestamp) bool { var allocSize int64 for _, allocation := range segment.allocations { - allocSize += allocation.numOfRows + allocSize += allocation.NumOfRows } return float64(segment.currRows) >= sizeFactor*float64(segment.GetMaxRowNum()) } diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 37f3db005..14135bbc0 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -168,7 +168,7 @@ func SetAllocations(allocations []*Allocation) SegmentInfoOption { func AddAllocation(allocation *Allocation) SegmentInfoOption { return func(segment *SegmentInfo) { segment.allocations = append(segment.allocations, allocation) - segment.LastExpireTime = allocation.expireTime + segment.LastExpireTime = allocation.ExpireTime } } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index a6e9f58a5..49e63f5c6 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -29,14 +29,10 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" ) -var errRemainInSufficient = func(requestRows int64) error { - return fmt.Errorf("segment remaining is insufficient for %d", requestRows) -} - // Manager manage segment related operations. type Manager interface { // AllocSegment allocate rows and record the allocation. - AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (UniqueID, int64, Timestamp, error) + AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) // DropSegment drop the segment from allocator. DropSegment(ctx context.Context, segmentID UniqueID) // SealAllSegments sealed all segmetns of collection with collectionID and return sealed segments @@ -49,8 +45,9 @@ type Manager interface { // allcation entry for segment Allocation record type Allocation struct { - numOfRows int64 - expireTime Timestamp + SegmentID UniqueID + NumOfRows int64 + ExpireTime Timestamp } // SegmentManager handles segment related logic @@ -61,7 +58,7 @@ type SegmentManager struct { helper allocHelper segments []UniqueID estimatePolicy calUpperLimitPolicy - allocPolicy allocatePolicy + allocPolicy AllocatePolicy segmentSealPolicies []segmentSealPolicy channelSealPolicies []channelSealPolicy flushPolicy flushPolicy @@ -103,7 +100,7 @@ func withCalUpperLimitPolicy(policy calUpperLimitPolicy) allocOption { } // get allocOption with allocPolicy -func withAllocPolicy(policy allocatePolicy) allocOption { +func withAllocPolicy(policy AllocatePolicy) allocOption { return allocFunc(func(manager *SegmentManager) { manager.allocPolicy = policy }) } @@ -132,8 +129,8 @@ func defaultCalUpperLimitPolicy() calUpperLimitPolicy { return calBySchemaPolicy } -func defaultAlocatePolicy() allocatePolicy { - return allocatePolicyV1 +func defaultAlocatePolicy() AllocatePolicy { + return AllocatePolicyV1 } func defaultSealPolicy() sealPolicy { @@ -188,14 +185,14 @@ func (s *SegmentManager) getAllocation(numOfRows int64) *Allocation { v := s.allocPool.Get() if v == nil { return &Allocation{ - numOfRows: numOfRows, + NumOfRows: numOfRows, } } a, ok := v.(*Allocation) if !ok { a = &Allocation{} } - a.numOfRows = numOfRows + a.NumOfRows = numOfRows return a } @@ -206,79 +203,61 @@ func (s *SegmentManager) putAllocation(a *Allocation) { // AllocSegment allocate segment per request collcation, partication, channel and rows func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, - partitionID UniqueID, channelName string, requestRows int64) (segID UniqueID, retCount int64, expireTime Timestamp, err error) { + partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() s.mu.Lock() defer s.mu.Unlock() - var segment *SegmentInfo - var allocation *Allocation + // filter segments + segments := make([]*SegmentInfo, 0) for _, segmentID := range s.segments { - segment = s.meta.GetSegment(segmentID) + segment := s.meta.GetSegment(segmentID) if segment == nil { - log.Warn("Failed to get seginfo from meta", zap.Int64("id", segmentID), zap.Error(err)) + log.Warn("Failed to get seginfo from meta", zap.Int64("id", segmentID)) continue } if segment.State == commonpb.SegmentState_Sealed || segment.CollectionID != collectionID || segment.PartitionID != partitionID || segment.InsertChannel != channelName { continue } - allocation, err = s.alloc(segment, requestRows) - if err != nil { - return - } - if allocation != nil { - break - } - } - - if allocation == nil { - segment, err = s.openNewSegment(ctx, collectionID, partitionID, channelName) - if err != nil { - return - } - segment = s.meta.GetSegment(segment.GetID()) - if segment == nil { - log.Warn("Failed to get seg into from meta", zap.Int64("id", segment.GetID()), zap.Error(err)) - return - } - allocation, err = s.alloc(segment, requestRows) - if err != nil { - return - } - if allocation == nil { - err = errRemainInSufficient(requestRows) - return - } + segments = append(segments, segment) } - segID = segment.GetID() - retCount = allocation.numOfRows - expireTime = allocation.expireTime - return -} - -func (s *SegmentManager) alloc(segment *SegmentInfo, numOfRows int64) (*Allocation, error) { - var allocSize int64 - for _, allocItem := range segment.allocations { - allocSize += allocItem.numOfRows - } - - if !s.allocPolicy(segment, numOfRows) { - return nil, nil + // apply allocate policy + maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID) + if err != nil { + return nil, err } + newSegmentAllocations, existedSegmentAllocations := s.allocPolicy(segments, + requestRows, int64(maxCountPerSegment)) - alloc := s.getAllocation(numOfRows) + // create new segments and add allocations expireTs, err := s.genExpireTs() if err != nil { return nil, err } - alloc.expireTime = expireTs + for _, allocation := range newSegmentAllocations { + segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName) + if err != nil { + return nil, err + } + allocation.ExpireTime = expireTs + allocation.SegmentID = segment.GetID() + if err := s.meta.AddAllocation(segment.GetID(), allocation); err != nil { + return nil, err + } + } + + for _, allocation := range existedSegmentAllocations { + allocation.ExpireTime = expireTs + if err := s.meta.AddAllocation(allocation.SegmentID, allocation); err != nil { + return nil, err + } + } - //safe here since info is a clone, used to pass expireTs out - s.meta.AddAllocation(segment.GetID(), alloc) - return alloc, nil + allocations := append(newSegmentAllocations, existedSegmentAllocations...) + return allocations, nil } func (s *SegmentManager) genExpireTs() (Timestamp, error) { @@ -425,7 +404,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error { continue } for i := 0; i < len(segment.allocations); i++ { - if segment.allocations[i].expireTime <= ts { + if segment.allocations[i].ExpireTime <= ts { a := segment.allocations[i] segment.allocations = append(segment.allocations[:i], segment.allocations[i+1:]...) s.putAllocation(a) diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 662767deb..a186b1160 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -11,11 +11,11 @@ package datacoord import ( "context" - "math" "testing" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/schemapb" "github.com/stretchr/testify/assert" ) @@ -32,27 +32,15 @@ func TestAllocSegment(t *testing.T) { collID, err := mockAllocator.allocID() assert.Nil(t, err) meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) - cases := []struct { - collectionID UniqueID - partitionID UniqueID - channelName string - requestRows int64 - expectResult bool - }{ - {collID, 100, "c1", 100, true}, - {collID, 100, "c1", math.MaxInt64, false}, - } - for _, c := range cases { - id, count, expireTime, err := segmentManager.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows) - if c.expectResult { - assert.Nil(t, err) - assert.EqualValues(t, c.requestRows, count) - assert.NotEqualValues(t, 0, id) - assert.NotEqualValues(t, 0, expireTime) - } else { - assert.NotNil(t, err) - } - } + + t.Run("normal allocation", func(t *testing.T) { + allocations, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + assert.EqualValues(t, 100, allocations[0].NumOfRows) + assert.NotEqualValues(t, 0, allocations[0].SegmentID) + assert.NotEqualValues(t, 0, allocations[0].ExpireTime) + }) } func TestLoadSegmentsFromMeta(t *testing.T) { @@ -116,13 +104,14 @@ func TestSaveSegmentsToMeta(t *testing.T) { assert.Nil(t, err) meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) segmentManager := newSegmentManager(meta, mockAllocator) - segID, _, expireTs, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) + allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) _, err = segmentManager.SealAllSegments(context.Background(), collID) assert.Nil(t, err) - segment := meta.GetSegment(segID) + segment := meta.GetSegment(allocations[0].SegmentID) assert.NotNil(t, segment) - assert.EqualValues(t, segment.LastExpireTime, expireTs) + assert.EqualValues(t, segment.LastExpireTime, allocations[0].ExpireTime) assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State) } @@ -137,8 +126,10 @@ func TestDropSegment(t *testing.T) { assert.Nil(t, err) meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) segmentManager := newSegmentManager(meta, mockAllocator) - segID, _, _, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) + allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + segID := allocations[0].SegmentID segment := meta.GetSegment(segID) assert.NotNil(t, segment) @@ -146,3 +137,25 @@ func TestDropSegment(t *testing.T) { segment = meta.GetSegment(segID) assert.NotNil(t, segment) } + +func TestAllocRowsLargerThanOneSegment(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID() + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + + var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) { + return 1, nil + } + segmentManager := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy)) + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 2, len(allocations)) + assert.EqualValues(t, 1, allocations[0].NumOfRows) + assert.EqualValues(t, 1, allocations[1].NumOfRows) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c578070dd..2671045ad 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -407,7 +407,7 @@ func (s *Server) startActiveCheck(ctx context.Context) { if ok { continue } - s.Stop() + go func() { s.Stop() }() log.Debug("disconnect with etcd and shutdown data coordinator") return case <-ctx.Done(): @@ -487,7 +487,6 @@ func (s *Server) Stop() error { return nil } log.Debug("DataCoord server shutdown") - atomic.StoreInt64(&s.isServing, ServerStateStopped) s.cluster.Close() s.stopServerLoop() return nil diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index a1f8a0e79..749fe4e3e 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -11,7 +11,6 @@ package datacoord import ( "context" - "math" "path" "strconv" "testing" @@ -26,7 +25,6 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/clientv3" ) @@ -57,52 +55,46 @@ func TestAssignSegmentID(t *testing.T) { Schema: schema, Partitions: []int64{}, }) - recordSize, err := typeutil.EstimateSizePerRecord(schema) - assert.Nil(t, err) - maxCount := int(Params.SegmentMaxSize * 1024 * 1024 / float64(recordSize)) - cases := []struct { - Description string - CollectionID UniqueID - PartitionID UniqueID - ChannelName string - Count uint32 - Success bool - }{ - {"assign segment normally", collID, partID, channel0, 1000, true}, - {"assign segment with invalid collection", collIDInvalid, partID, channel0, 1000, false}, - {"assign with max count", collID, partID, channel0, uint32(maxCount), true}, - {"assign with max uint32 count", collID, partID, channel1, math.MaxUint32, false}, - } + t.Run("assign segment normally", func(t *testing.T) { + req := &datapb.SegmentIDRequest{ + Count: 1000, + ChannelName: channel0, + CollectionID: collID, + PartitionID: partID, + } - for _, test := range cases { - t.Run(test.Description, func(t *testing.T) { - req := &datapb.SegmentIDRequest{ - Count: test.Count, - ChannelName: test.ChannelName, - CollectionID: test.CollectionID, - PartitionID: test.PartitionID, - } + resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, + }) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(resp.SegIDAssignments)) + assign := resp.SegIDAssignments[0] + assert.EqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode) + assert.EqualValues(t, collID, assign.CollectionID) + assert.EqualValues(t, partID, assign.PartitionID) + assert.EqualValues(t, channel0, assign.ChannelName) + assert.EqualValues(t, 1000, assign.Count) + }) - resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: "", - SegmentIDRequests: []*datapb.SegmentIDRequest{req}, - }) - assert.Nil(t, err) - assert.EqualValues(t, 1, len(resp.SegIDAssignments)) - assign := resp.SegIDAssignments[0] - if test.Success { - assert.EqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode) - assert.EqualValues(t, test.CollectionID, assign.CollectionID) - assert.EqualValues(t, test.PartitionID, assign.PartitionID) - assert.EqualValues(t, test.ChannelName, assign.ChannelName) - assert.EqualValues(t, test.Count, assign.Count) - } else { - assert.NotEqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode) - } + t.Run("assign segment with invalid collection", func(t *testing.T) { + req := &datapb.SegmentIDRequest{ + Count: 1000, + ChannelName: channel0, + CollectionID: collIDInvalid, + PartitionID: partID, + } + + resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, }) - } + assert.Nil(t, err) + assert.EqualValues(t, 0, len(resp.SegIDAssignments)) + }) } func TestFlush(t *testing.T) { @@ -110,8 +102,12 @@ func TestFlush(t *testing.T) { defer closeTestServer(t, svr) schema := newTestSchema() svr.meta.AddCollection(&datapb.CollectionInfo{ID: 0, Schema: schema, Partitions: []int64{}}) - segID, _, expireTs, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) + allocations, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + expireTs := allocations[0].ExpireTime + segID := allocations[0].SegmentID + req := &datapb.FlushRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Flush, -- GitLab