未验证 提交 cc99bae2 编写于 作者: Z zhenshan.cao 提交者: GitHub

Add unittest for proxy (#7342)

Fix unittest data race
Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 880aa540
......@@ -31,7 +31,7 @@ func (tc *TaskCondition) WaitToFinish() error {
for {
select {
case <-tc.ctx.Done():
return errors.New("timeout")
return errors.New("Proxy TaskCondition context Done")
case err := <-tc.done:
return err
}
......
......@@ -24,17 +24,19 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
SegCountPerRPC = 20000
ActiveTimeDuration = 100 //second
SegCountPerRPC = 20000
)
type Allocator = allocator.Allocator
type DataCoord interface {
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
}
type segRequest struct {
allocator.BaseRequest
count uint32
......@@ -128,10 +130,6 @@ func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32,
return result, nil
}
func (info *assignInfo) IsActive(now time.Time) bool {
return now.Sub(info.lastInsertTime) <= ActiveTimeDuration*time.Second
}
type SegIDAssigner struct {
Allocator
assignInfos map[UniqueID]*list.List // collectionID -> *list.List
......@@ -139,11 +137,11 @@ type SegIDAssigner struct {
getTickFunc func() Timestamp
PeerID UniqueID
dataCoord types.DataCoord
dataCoord DataCoord
countPerRPC uint32
}
func NewSegIDAssigner(ctx context.Context, dataCoord types.DataCoord, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
func NewSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*SegIDAssigner, error) {
ctx1, cancel := context.WithCancel(ctx)
sa := &SegIDAssigner{
Allocator: Allocator{
......@@ -167,10 +165,6 @@ func NewSegIDAssigner(ctx context.Context, dataCoord types.DataCoord, getTickFun
return sa, nil
}
func (sa *SegIDAssigner) SetServiceClient(client types.DataCoord) {
sa.dataCoord = client
}
func (sa *SegIDAssigner) collectExpired() {
ts := sa.getTickFunc()
for _, info := range sa.assignInfos {
......@@ -185,11 +179,12 @@ func (sa *SegIDAssigner) collectExpired() {
}
func (sa *SegIDAssigner) pickCanDoFunc() {
log.Debug("Proxy SegIDAssigner pickCanDoFunc", zap.Any("len(ToDoReqs)", len(sa.ToDoReqs)))
if sa.ToDoReqs == nil {
return
}
records := make(map[UniqueID]map[UniqueID]map[string]uint32)
newTodoReqs := sa.ToDoReqs[0:0]
var newTodoReqs []allocator.Request
for _, req := range sa.ToDoReqs {
segRequest := req.(*segRequest)
collID := segRequest.collID
......@@ -209,6 +204,11 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
records[collID][partitionID][channelName] += segRequest.count
assign, err := sa.getAssign(segRequest.collID, segRequest.partitionID, segRequest.channelName)
if err != nil {
log.Debug("Proxy SegIDAssigner, pickCanDoFunc getAssign err:", zap.Any("collID", segRequest.collID),
zap.Any("partitionID", segRequest.partitionID), zap.Any("channelName", segRequest.channelName),
zap.Error(err))
}
if err != nil || assign.Capacity(segRequest.timestamp) < records[collID][partitionID][channelName] {
sa.segReqs = append(sa.segReqs, &datapb.SegmentIDRequest{
ChannelName: channelName,
......@@ -221,6 +221,9 @@ func (sa *SegIDAssigner) pickCanDoFunc() {
sa.CanDoReqs = append(sa.CanDoReqs, req)
}
}
log.Debug("Proxy SegIDAssigner pickCanDoFunc", zap.Any("records", records),
zap.Any("len(newTodoReqs)", len(newTodoReqs)),
zap.Any("len(CanDoReqs)", len(sa.CanDoReqs)))
sa.ToDoReqs = newTodoReqs
}
......@@ -258,13 +261,18 @@ func (sa *SegIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b
}
func (sa *SegIDAssigner) reduceSegReqs() {
log.Debug("Proxy SegIDAssigner reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs)))
if len(sa.segReqs) == 0 {
return
}
beforeCnt := uint32(0)
var newSegReqs []*datapb.SegmentIDRequest
for _, req1 := range sa.segReqs {
if req1.Count == 0 {
log.Debug("Proxy SegIDAssigner reduceSegReqs hit perCount == 0")
req1.Count = sa.countPerRPC
}
beforeCnt += req1.Count
var req2 *datapb.SegmentIDRequest
for _, req3 := range newSegReqs {
if sa.checkSegReqEqual(req1, req3) {
......@@ -278,13 +286,14 @@ func (sa *SegIDAssigner) reduceSegReqs() {
req2.Count += req1.Count
}
}
afterCnt := uint32(0)
for _, req := range newSegReqs {
if req.Count == 0 {
req.Count = sa.countPerRPC
}
afterCnt += req.Count
}
sa.segReqs = newSegReqs
log.Debug("Proxy SegIDAssigner reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)),
zap.Any("BeforeCnt", beforeCnt),
zap.Any("AfterCnt", afterCnt))
}
func (sa *SegIDAssigner) syncSegments() (bool, error) {
......@@ -298,18 +307,27 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
SegmentIDRequests: sa.segReqs,
}
sa.segReqs = []*datapb.SegmentIDRequest{}
sa.segReqs = nil
resp, err := sa.dataCoord.AssignSegmentID(context.Background(), req)
if err != nil {
return false, fmt.Errorf("syncSegmentID Failed:%w", err)
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, fmt.Errorf("syncSegmentID Failed:%s", resp.Status.Reason)
}
var errMsg string
now := time.Now()
success := false
success := true
for _, info := range resp.SegIDAssignments {
if info.Status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Debug("proxy", zap.String("SyncSegment Error", info.Status.Reason))
errMsg += info.Status.Reason
errMsg += "\n"
success = false
continue
}
assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
......@@ -338,7 +356,9 @@ func (sa *SegIDAssigner) syncSegments() (bool, error) {
assign.segInfos.PushBack(segInfo2)
}
assign.lastInsertTime = now
success = true
}
if !success {
return false, fmt.Errorf(errMsg)
}
return success, nil
}
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package proxy
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
)
type mockDataCoord struct {
expireTime Timestamp
}
func (mockD *mockDataCoord) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
maxPerCnt := 100
for _, r := range req.SegmentIDRequests {
totalCnt := uint32(0)
for totalCnt != r.Count {
cnt := uint32(rand.Intn(maxPerCnt))
if totalCnt+cnt > r.Count {
cnt = r.Count - totalCnt
}
totalCnt += cnt
result := &datapb.SegmentIDAssignment{
SegID: 1,
ChannelName: r.ChannelName,
Count: cnt,
CollectionID: r.CollectionID,
PartitionID: r.PartitionID,
ExpireTime: mockD.expireTime,
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
}
assigns = append(assigns, result)
}
}
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
SegIDAssignments: assigns,
}, nil
}
type mockDataCoord2 struct {
expireTime Timestamp
}
func (mockD *mockDataCoord2) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Just For Test",
},
}, nil
}
func getLastTick1() Timestamp {
return 1000
}
func TestSegmentAllocator1(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord{}
dataCoord.expireTime = Timestamp(1000)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick1)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
total := uint32(0)
collNames := []string{"abc", "cba"}
for i := 0; i < 10; i++ {
colName := collNames[i%2]
ret, err := segAllocator.GetSegmentID(1, 1, colName, 1, 1)
assert.Nil(t, err)
total += ret[1]
}
assert.Equal(t, uint32(10), total)
ret, err := segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, 999)
assert.Nil(t, err)
assert.Equal(t, uint32(SegCountPerRPC-10), ret[1])
_, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1001)
assert.NotNil(t, err)
wg.Wait()
}
var curLastTick2 = Timestamp(2000)
var curLastTIck2Lock sync.Mutex
func getLastTick2() Timestamp {
curLastTIck2Lock.Lock()
defer curLastTIck2Lock.Unlock()
curLastTick2 += 1000
return curLastTick2
}
func TestSegmentAllocator2(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord{}
dataCoord.expireTime = Timestamp(2500)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
total := uint32(0)
for i := 0; i < 10; i++ {
ret, err := segAllocator.GetSegmentID(1, 1, "abc", 1, 2000)
assert.Nil(t, err)
total += ret[1]
}
assert.Equal(t, uint32(10), total)
time.Sleep(time.Second)
_, err = segAllocator.GetSegmentID(1, 1, "abc", SegCountPerRPC-10, getLastTick2())
assert.NotNil(t, err)
wg.Wait()
}
func TestSegmentAllocator3(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord2{}
dataCoord.expireTime = Timestamp(2500)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
time.Sleep(time.Second)
_, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000)
assert.NotNil(t, err)
wg.Wait()
}
type mockDataCoord3 struct {
expireTime Timestamp
}
func (mockD *mockDataCoord3) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
assigns := make([]*datapb.SegmentIDAssignment, 0, len(req.SegmentIDRequests))
for i, r := range req.SegmentIDRequests {
errCode := commonpb.ErrorCode_Success
reason := ""
if i == 0 {
errCode = commonpb.ErrorCode_UnexpectedError
reason = "Just for test"
}
result := &datapb.SegmentIDAssignment{
SegID: 1,
ChannelName: r.ChannelName,
Count: r.Count,
CollectionID: r.CollectionID,
PartitionID: r.PartitionID,
ExpireTime: mockD.expireTime,
Status: &commonpb.Status{
ErrorCode: errCode,
Reason: reason,
},
}
assigns = append(assigns, result)
}
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
SegIDAssignments: assigns,
}, nil
}
func TestSegmentAllocator4(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord3{}
dataCoord.expireTime = Timestamp(2500)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
time.Sleep(time.Second)
_, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000)
assert.NotNil(t, err)
wg.Wait()
}
type mockDataCoord5 struct {
expireTime Timestamp
}
func (mockD *mockDataCoord5) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
return &datapb.AssignSegmentIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Just For Test",
},
}, fmt.Errorf("Just for test")
}
func TestSegmentAllocator5(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord5{}
dataCoord.expireTime = Timestamp(2500)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
time.Sleep(time.Second)
_, err = segAllocator.GetSegmentID(1, 1, "abc", 10, 1000)
assert.NotNil(t, err)
wg.Wait()
}
func TestSegmentAllocator6(t *testing.T) {
ctx := context.Background()
dataCoord := &mockDataCoord{}
dataCoord.expireTime = Timestamp(2500)
segAllocator, err := NewSegIDAssigner(ctx, dataCoord, getLastTick2)
assert.Nil(t, err)
wg := &sync.WaitGroup{}
segAllocator.Start()
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
time.Sleep(2 * time.Second)
segAllocator.Close()
}(wg)
success := true
var sucLock sync.Mutex
collNames := []string{"abc", "cba"}
reqFunc := func(i int, group *sync.WaitGroup) {
defer group.Done()
sucLock.Lock()
defer sucLock.Unlock()
if !success {
return
}
colName := collNames[i%2]
count := uint32(10)
if i == 0 {
count = 0
}
_, err = segAllocator.GetSegmentID(1, 1, colName, count, 1000)
if err != nil {
fmt.Println(err)
success = false
}
}
for i := 0; i < 10; i++ {
wg.Add(1)
go reqFunc(i, wg)
}
wg.Wait()
assert.True(t, success)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册