diff --git a/internal/proxy/condition.go b/internal/proxy/condition.go index cf98e7d4383b6ea0b3c3f1e8dc5a29e744564104..b77b6c4b9732d05a52c6c10f2b91c421b2a88722 100644 --- a/internal/proxy/condition.go +++ b/internal/proxy/condition.go @@ -31,7 +31,7 @@ func (tc *TaskCondition) WaitToFinish() error { for { select { case <-tc.ctx.Done(): - return errors.New("timeout") + return errors.New("Proxy TaskCondition context Done") case err := <-tc.done: return err } diff --git a/internal/proxy/segment.go b/internal/proxy/segment.go index 08af11cc1bd703e7f302f3db5f2c5183f914334f..f41552ee26317fd6995eef87abbce7b018e9d381 100644 --- a/internal/proxy/segment.go +++ b/internal/proxy/segment.go @@ -24,17 +24,19 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/typeutil" ) const ( - SegCountPerRPC = 20000 - ActiveTimeDuration = 100 //second + SegCountPerRPC = 20000 ) type Allocator = allocator.Allocator +type DataCoord interface { + AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) +} + type segRequest struct { allocator.BaseRequest count uint32 @@ -128,10 +130,6 @@ func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32, return result, nil } -func (info *assignInfo) IsActive(now time.Time) bool { - return now.Sub(info.lastInsertTime) <= ActiveTimeDuration*time.Second -} - type SegIDAssigner struct { Allocator assignInfos map[UniqueID]*list.List // collectionID -> *list.List @@ -139,11 +137,11 @@ type SegIDAssigner struct { getTickFunc func() Timestamp PeerID UniqueID - dataCoord types.DataCoord + dataCoord DataCoord countPerRPC uint32 } -func NewSegIDAssigner(ctx context.Context, dataCoord types.DataCoord, getTickFunc func() Timestamp) (*SegIDAssigner, error) { +func NewSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*SegIDAssigner, error) { ctx1, cancel := context.WithCancel(ctx) sa := &SegIDAssigner{ Allocator: Allocator{ @@ -167,10 +165,6 @@ func NewSegIDAssigner(ctx context.Context, dataCoord types.DataCoord, getTickFun return sa, nil } -func (sa *SegIDAssigner) SetServiceClient(client types.DataCoord) { - sa.dataCoord = client -} - func (sa *SegIDAssigner) collectExpired() { ts := sa.getTickFunc() for _, info := range sa.assignInfos { @@ -185,11 +179,12 @@ func (sa *SegIDAssigner) collectExpired() { } func (sa *SegIDAssigner) pickCanDoFunc() { + log.Debug("Proxy SegIDAssigner pickCanDoFunc", zap.Any("len(ToDoReqs)", len(sa.ToDoReqs))) if sa.ToDoReqs == nil { return } records := make(map[UniqueID]map[UniqueID]map[string]uint32) - newTodoReqs := sa.ToDoReqs[0:0] + var newTodoReqs []allocator.Request for _, req := range sa.ToDoReqs { segRequest := req.(*segRequest) collID := segRequest.collID @@ -209,6 +204,11 @@ func (sa *SegIDAssigner) pickCanDoFunc() { records[collID][partitionID][channelName] += segRequest.count assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName) + if err != nil { + log.Debug("Proxy SegIDAssigner, pickCanDoFunc getAssign err:", zap.Any("collID", segRequest.collID), + zap.Any("partitionID", segRequest.partitionID), zap.Any("channelName", segRequest.channelName), + zap.Error(err)) + } if err != nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] { sa.segReqs = append(sa.segReqs, &datapb.SegmentIDRequest{ ChannelName: channelName, @@ -221,6 +221,9 @@ func (sa *SegIDAssigner) pickCanDoFunc() { sa.CanDoReqs = append(sa.CanDoReqs, req) } } + log.Debug("Proxy SegIDAssigner pickCanDoFunc", zap.Any("records", records), + zap.Any("len(newTodoReqs)", len(newTodoReqs)), + zap.Any("len(CanDoReqs)", len(sa.CanDoReqs))) sa.ToDoReqs = newTodoReqs } @@ -258,13 +261,18 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b } func (sa *SegIDAssigner) reduceSegReqs() { - + log.Debug("Proxy SegIDAssigner reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs))) if len(sa.segReqs) == 0 { return } - + beforeCnt := uint32(0) var newSegReqs []*datapb.SegmentIDRequest for _, req1 := range sa.segReqs { + if req1.Count == 0 { + log.Debug("Proxy SegIDAssigner reduceSegReqs hit perCount == 0") + req1.Count = sa.countPerRPC + } + beforeCnt += req1.Count var req2 *datapb.SegmentIDRequest for _, req3 := range newSegReqs { if sa.checkSegReqEqual(req1, req3) { @@ -278,13 +286,14 @@ func (sa *SegIDAssigner) reduceSegReqs() { req2.Count += req1.Count } } - + afterCnt := uint32(0) for _, req := range newSegReqs { - if req.Count == 0 { - req.Count = sa.countPerRPC - } + afterCnt += req.Count } sa.segReqs = newSegReqs + log.Debug("Proxy SegIDAssigner reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)), + zap.Any("BeforeCnt", beforeCnt), + zap.Any("AfterCnt", afterCnt)) } func (sa *SegIDAssigner) syncSegments() (bool, error) { @@ -298,18 +307,27 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) { SegmentIDRequests: sa.segReqs, } - sa.segReqs = []*datapb.SegmentIDRequest{} + sa.segReqs = nil + resp, err := sa.dataCoord.AssignSegmentID(context.Background(), req) if err != nil { return false, fmt.Errorf("syncSegmentID Failed:%w", err) } + if resp.Status.ErrorCode != commonpb.ErrorCode_Success { + return false, fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason) + } + + var errMsg string now := time.Now() - success := false + success := true for _, info := range resp.SegIDAssignments { if info.Status.GetErrorCode() != commonpb.ErrorCode_Success { log.Debug("proxy", zap.String("SyncSegment Error", info.Status.Reason)) + errMsg += info.Status.Reason + errMsg += "\n" + success = false continue } assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName) @@ -338,7 +356,9 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) { assign.segInfos.PushBack(segInfo2) } assign.lastInsertTime = now - success = true + } + if !success { + return false, fmt.Errorf(errMsg) } return success, nil } diff --git a/internal/proxy/segment_test.go b/internal/proxy/segment_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8e133c4b9e9c0fcb144dee85a9ef93f4b2edd681 --- /dev/null +++ b/internal/proxy/segment_test.go @@ -0,0 +1,317 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package proxy + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/stretchr/testify/assert" +) + +type mockDataCoord struct { + expireTime Timestamp +} + +func (mockD *mockDataCoord) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests)) + maxPerCnt := 100 + for _, r := range req.SegmentIDRequests { + totalCnt := uint32(0) + for totalCnt != r.Count { + cnt := uint32(rand.Intn(maxPerCnt)) + if totalCnt+cnt > r.Count { + cnt = r.Count - totalCnt + } + totalCnt += cnt + result := &datapb.SegmentIDAssignment{ + SegID: 1, + ChannelName: r.ChannelName, + Count: cnt, + CollectionID: r.CollectionID, + PartitionID: r.PartitionID, + ExpireTime: mockD.expireTime, + + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + } + assigns = append(assigns, result) + } + } + + return &datapb.AssignSegmentIDResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + SegIDAssignments: assigns, + }, nil +} + +type mockDataCoord2 struct { + expireTime Timestamp +} + +func (mockD *mockDataCoord2) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + + return &datapb.AssignSegmentIDResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "Just For Test", + }, + }, nil +} + +func getLastTick1() Timestamp { + return 1000 +} + +func TestSegmentAllocator1(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord{} + dataCoord.expireTime = Timestamp(1000) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick1) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + total := uint32(0) + collNames := []string{"abc", "cba"} + for i := 0; i < 10; i++ { + colName := collNames[i%2] + ret, err := segAllocator.GetSegmentID(1, 1, colName, 1, 1) + assert.Nil(t, err) + total += ret[1] + } + assert.Equal(t, uint32(10), total) + + ret, err := segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, 999) + assert.Nil(t, err) + assert.Equal(t, uint32(SegCountPerRPC-10), ret[1]) + + _, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1001) + assert.NotNil(t, err) + wg.Wait() + +} + +var curLastTick2 = Timestamp(2000) +var curLastTIck2Lock sync.Mutex + +func getLastTick2() Timestamp { + curLastTIck2Lock.Lock() + defer curLastTIck2Lock.Unlock() + curLastTick2 += 1000 + return curLastTick2 +} + +func TestSegmentAllocator2(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord{} + dataCoord.expireTime = Timestamp(2500) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + total := uint32(0) + for i := 0; i < 10; i++ { + ret, err := segAllocator.GetSegmentID(1, 1, "abc", 1, 2000) + assert.Nil(t, err) + total += ret[1] + } + assert.Equal(t, uint32(10), total) + time.Sleep(time.Second) + _, err = segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, getLastTick2()) + assert.NotNil(t, err) + wg.Wait() + +} + +func TestSegmentAllocator3(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord2{} + dataCoord.expireTime = Timestamp(2500) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + time.Sleep(time.Second) + _, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000) + assert.NotNil(t, err) + wg.Wait() +} + +type mockDataCoord3 struct { + expireTime Timestamp +} + +func (mockD *mockDataCoord3) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests)) + for i, r := range req.SegmentIDRequests { + errCode := commonpb.ErrorCode_Success + reason := "" + if i == 0 { + errCode = commonpb.ErrorCode_UnexpectedError + reason = "Just for test" + } + result := &datapb.SegmentIDAssignment{ + SegID: 1, + ChannelName: r.ChannelName, + Count: r.Count, + CollectionID: r.CollectionID, + PartitionID: r.PartitionID, + ExpireTime: mockD.expireTime, + + Status: &commonpb.Status{ + ErrorCode: errCode, + Reason: reason, + }, + } + assigns = append(assigns, result) + } + + return &datapb.AssignSegmentIDResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + SegIDAssignments: assigns, + }, nil +} + +func TestSegmentAllocator4(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord3{} + dataCoord.expireTime = Timestamp(2500) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + time.Sleep(time.Second) + _, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000) + assert.NotNil(t, err) + wg.Wait() +} + +type mockDataCoord5 struct { + expireTime Timestamp +} + +func (mockD *mockDataCoord5) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { + + return &datapb.AssignSegmentIDResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "Just For Test", + }, + }, fmt.Errorf("Just for test") +} + +func TestSegmentAllocator5(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord5{} + dataCoord.expireTime = Timestamp(2500) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + time.Sleep(time.Second) + _, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000) + assert.NotNil(t, err) + wg.Wait() +} + +func TestSegmentAllocator6(t *testing.T) { + ctx := context.Background() + dataCoord := &mockDataCoord{} + dataCoord.expireTime = Timestamp(2500) + segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2) + assert.Nil(t, err) + wg := &sync.WaitGroup{} + segAllocator.Start() + + wg.Add(1) + go func(group *sync.WaitGroup) { + defer group.Done() + time.Sleep(2 * time.Second) + segAllocator.Close() + }(wg) + success := true + var sucLock sync.Mutex + collNames := []string{"abc", "cba"} + reqFunc := func(i int, group *sync.WaitGroup) { + defer group.Done() + sucLock.Lock() + defer sucLock.Unlock() + if !success { + return + } + colName := collNames[i%2] + count := uint32(10) + if i == 0 { + count = 0 + } + _, err = segAllocator.GetSegmentID(1, 1, colName, count, 1000) + if err != nil { + fmt.Println(err) + success = false + } + } + + for i := 0; i < 10; i++ { + wg.Add(1) + go reqFunc(i, wg) + } + wg.Wait() + assert.True(t, success) + +}