提交 25f3e520 编写于 作者: 紫晴 提交者: yefu.chen

Add testcase for index

Signed-off-by: N紫晴 <ting.wang@zilliz.com>
上级 1634d759
package dataservice
import (
"github.com/golang/protobuf/proto"
"context"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
......@@ -20,20 +21,20 @@ func newDDHandler(meta *meta, allocator segmentAllocatorInterface) *ddHandler {
}
}
func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error {
func (handler *ddHandler) HandleDDMsg(ctx context.Context, msg msgstream.TsMsg) error {
switch msg.Type() {
case commonpb.MsgType_CreateCollection:
realMsg := msg.(*msgstream.CreateCollectionMsg)
return handler.handleCreateCollection(realMsg)
case commonpb.MsgType_DropCollection:
realMsg := msg.(*msgstream.DropCollectionMsg)
return handler.handleDropCollection(realMsg)
return handler.handleDropCollection(ctx, realMsg)
case commonpb.MsgType_CreatePartition:
realMsg := msg.(*msgstream.CreatePartitionMsg)
return handler.handleCreatePartition(realMsg)
case commonpb.MsgType_DropPartition:
realMsg := msg.(*msgstream.DropPartitionMsg)
return handler.handleDropPartition(realMsg)
return handler.handleDropPartition(ctx, realMsg)
default:
return nil
}
......@@ -54,10 +55,10 @@ func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollection
return nil
}
func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error {
func (handler *ddHandler) handleDropCollection(ctx context.Context, msg *msgstream.DropCollectionMsg) error {
segmentsOfCollection := handler.meta.GetSegmentsOfCollection(msg.CollectionID)
for _, id := range segmentsOfCollection {
handler.segmentAllocator.DropSegment(id)
handler.segmentAllocator.DropSegment(ctx, id)
}
if err := handler.meta.DropCollection(msg.CollectionID); err != nil {
return err
......@@ -65,10 +66,10 @@ func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg)
return nil
}
func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error {
func (handler *ddHandler) handleDropPartition(ctx context.Context, msg *msgstream.DropPartitionMsg) error {
segmentsOfPartition := handler.meta.GetSegmentsOfPartition(msg.CollectionID, msg.PartitionID)
for _, id := range segmentsOfPartition {
handler.segmentAllocator.DropSegment(id)
handler.segmentAllocator.DropSegment(ctx, id)
}
if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil {
return err
......
package dataservice
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
)
type errRemainInSufficient struct {
......@@ -28,21 +29,21 @@ func (err errRemainInSufficient) Error() string {
// segmentAllocator is used to allocate rows for segments and record the allocations.
type segmentAllocatorInterface interface {
// OpenSegment add the segment to allocator and set it allocatable
OpenSegment(segmentInfo *datapb.SegmentInfo) error
OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error
// AllocSegment allocate rows and record the allocation.
AllocSegment(collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int) (UniqueID, int, Timestamp, error)
// GetSealedSegments get all sealed segment.
GetSealedSegments() ([]UniqueID, error)
GetSealedSegments(ctx context.Context) ([]UniqueID, error)
// SealSegment set segment sealed, the segment will not be allocated anymore.
SealSegment(segmentID UniqueID) error
SealSegment(ctx context.Context, segmentID UniqueID) error
// DropSegment drop the segment from allocator.
DropSegment(segmentID UniqueID)
DropSegment(ctx context.Context, segmentID UniqueID)
// ExpireAllocations check all allocations' expire time and remove the expired allocation.
ExpireAllocations(timeTick Timestamp) error
ExpireAllocations(ctx context.Context, timeTick Timestamp) error
// SealAllSegments get all opened segment ids of collection. return success and failed segment ids
SealAllSegments(collectionID UniqueID)
SealAllSegments(ctx context.Context, collectionID UniqueID)
// IsAllocationsExpired check all allocations of segment expired.
IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error)
IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error)
}
type segmentStatus struct {
......@@ -81,7 +82,9 @@ func newSegmentAllocator(meta *meta, allocator allocatorInterface) *segmentAlloc
return segmentAllocator
}
func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo) error {
func (allocator *segmentAllocator) OpenSegment(ctx context.Context, segmentInfo *datapb.SegmentInfo) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
if _, ok := allocator.segments[segmentInfo.SegmentID]; ok {
......@@ -103,8 +106,10 @@ func (allocator *segmentAllocator) OpenSegment(segmentInfo *datapb.SegmentInfo)
return nil
}
func (allocator *segmentAllocator) AllocSegment(collectionID UniqueID,
func (allocator *segmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int) (segID UniqueID, retCount int, expireTime Timestamp, err error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
......@@ -173,7 +178,9 @@ func (allocator *segmentAllocator) estimateTotalRows(collectionID UniqueID) (int
return int(allocator.segmentThreshold / float64(sizePerRecord)), nil
}
func (allocator *segmentAllocator) GetSealedSegments() ([]UniqueID, error) {
func (allocator *segmentAllocator) GetSealedSegments(ctx context.Context) ([]UniqueID, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
keys := make([]UniqueID, 0)
......@@ -200,7 +207,9 @@ func (allocator *segmentAllocator) checkSegmentSealed(segStatus *segmentStatus)
return float64(segMeta.NumRows) >= allocator.segmentThresholdFactor*float64(segStatus.total), nil
}
func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
func (allocator *segmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
status, ok := allocator.segments[segmentID]
......@@ -211,13 +220,17 @@ func (allocator *segmentAllocator) SealSegment(segmentID UniqueID) error {
return nil
}
func (allocator *segmentAllocator) DropSegment(segmentID UniqueID) {
func (allocator *segmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
delete(allocator.segments, segmentID)
}
func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
func (allocator *segmentAllocator) ExpireAllocations(ctx context.Context, timeTick Timestamp) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, segStatus := range allocator.segments {
......@@ -232,7 +245,9 @@ func (allocator *segmentAllocator) ExpireAllocations(timeTick Timestamp) error {
return nil
}
func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts Timestamp) (bool, error) {
func (allocator *segmentAllocator) IsAllocationsExpired(ctx context.Context, segmentID UniqueID, ts Timestamp) (bool, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.RLock()
defer allocator.mu.RUnlock()
status, ok := allocator.segments[segmentID]
......@@ -242,7 +257,9 @@ func (allocator *segmentAllocator) IsAllocationsExpired(segmentID UniqueID, ts T
return status.lastExpireTime <= ts, nil
}
func (allocator *segmentAllocator) SealAllSegments(collectionID UniqueID) {
func (allocator *segmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
allocator.mu.Lock()
defer allocator.mu.Unlock()
for _, status := range allocator.segments {
......
package dataservice
import (
"context"
"log"
"math"
"strconv"
......@@ -13,6 +14,7 @@ import (
)
func TestAllocSegment(t *testing.T) {
ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
......@@ -33,7 +35,7 @@ func TestAllocSegment(t *testing.T) {
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
err = segAllocator.OpenSegment(ctx, segmentInfo)
assert.Nil(t, err)
cases := []struct {
......@@ -50,7 +52,7 @@ func TestAllocSegment(t *testing.T) {
{collID, 100, "c1", math.MaxInt64, false},
}
for _, c := range cases {
id, count, expireTime, err := segAllocator.AllocSegment(c.collectionID, c.partitionID, c.channelName, c.requestRows)
id, count, expireTime, err := segAllocator.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows)
if c.expectResult {
assert.Nil(t, err)
assert.EqualValues(t, c.requestRows, count)
......@@ -63,6 +65,7 @@ func TestAllocSegment(t *testing.T) {
}
func TestSealSegment(t *testing.T) {
ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
......@@ -85,20 +88,21 @@ func TestSealSegment(t *testing.T) {
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
err = segAllocator.OpenSegment(ctx, segmentInfo)
assert.Nil(t, err)
lastSegID = segmentInfo.SegmentID
}
err = segAllocator.SealSegment(lastSegID)
err = segAllocator.SealSegment(ctx, lastSegID)
assert.Nil(t, err)
segAllocator.SealAllSegments(collID)
sealedSegments, err := segAllocator.GetSealedSegments()
segAllocator.SealAllSegments(ctx, collID)
sealedSegments, err := segAllocator.GetSealedSegments(ctx)
assert.Nil(t, err)
assert.EqualValues(t, 10, len(sealedSegments))
}
func TestExpireSegment(t *testing.T) {
ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
......@@ -119,10 +123,10 @@ func TestExpireSegment(t *testing.T) {
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
err = segAllocator.OpenSegment(ctx, segmentInfo)
assert.Nil(t, err)
id1, _, et, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
id1, _, et, err := segAllocator.AllocSegment(ctx, collID, 100, "c1", 10)
ts2, _ := tsoutil.ParseTS(et)
log.Printf("physical ts: %s", ts2.String())
assert.Nil(t, err)
......@@ -134,9 +138,9 @@ func TestExpireSegment(t *testing.T) {
time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
ts, err = mockAllocator.allocTimestamp()
assert.Nil(t, err)
err = segAllocator.ExpireAllocations(ts)
err = segAllocator.ExpireAllocations(ctx, ts)
assert.Nil(t, err)
expired, err := segAllocator.IsAllocationsExpired(id1, ts)
expired, err := segAllocator.IsAllocationsExpired(ctx, id1, ts)
if et > ts {
tsPhy, _ := tsoutil.ParseTS(ts)
log.Printf("ts %s", tsPhy.String())
......
......@@ -22,6 +22,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/timesync"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
......@@ -367,9 +368,9 @@ func (s *Server) startDDChannel(ctx context.Context) {
return
default:
}
msgPack, _ := ddStream.Consume()
msgPack, ctx := ddStream.Consume()
for _, msg := range msgPack.Msgs {
if err := s.ddHandler.HandleDDMsg(msg); err != nil {
if err := s.ddHandler.HandleDDMsg(ctx, msg); err != nil {
log.Error("handle dd msg error", zap.Error(err))
continue
}
......@@ -502,7 +503,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb
Reason: "server is initializing",
}, nil
}
s.segAllocator.SealAllSegments(req.CollectionID)
s.segAllocator.SealAllSegments(ctx, req.CollectionID)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
......@@ -524,7 +525,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}
for _, r := range req.SegmentIDRequests {
if !s.meta.HasCollection(r.CollectionID) {
if err := s.loadCollectionFromMaster(r.CollectionID); err != nil {
if err := s.loadCollectionFromMaster(ctx, r.CollectionID); err != nil {
log.Error("load collection from master error", zap.Int64("collectionID", r.CollectionID), zap.Error(err))
continue
}
......@@ -534,7 +535,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
if err != nil {
if _, ok := err.(errRemainInSufficient); !ok {
result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s",
......@@ -543,14 +544,14 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
continue
}
if err = s.openNewSegment(r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
if err = s.openNewSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error: %s",
r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
resp.SegIDAssignments = append(resp.SegIDAssignments, result)
continue
}
segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
if err != nil {
result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error: %s",
r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
......@@ -571,8 +572,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
return resp, nil
}
func (s *Server) loadCollectionFromMaster(collectionID int64) error {
ctx := context.TODO()
func (s *Server) loadCollectionFromMaster(ctx context.Context, collectionID int64) error {
resp, err := s.masterClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
......@@ -591,7 +591,9 @@ func (s *Server) loadCollectionFromMaster(collectionID int64) error {
return s.meta.AddCollection(collInfo)
}
func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
func (s *Server) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) error {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID()
if err != nil {
return err
......@@ -603,7 +605,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
if err = s.meta.AddSegment(segmentInfo); err != nil {
return err
}
if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
if err = s.segAllocator.OpenSegment(ctx, segmentInfo); err != nil {
return err
}
infoMsg := &msgstream.SegmentInfoMsg{
......@@ -623,7 +625,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil {
if err = s.segmentInfoStream.Produce(ctx, msgPack); err != nil {
return err
}
return nil
......
......@@ -4,6 +4,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"go.uber.org/zap"
"golang.org/x/net/context"
......@@ -36,7 +37,8 @@ func (watcher *proxyTimeTickWatcher) StartBackgroundLoop(ctx context.Context) {
log.Debug("proxy time tick watcher closed")
return
case msg := <-watcher.msgQueue:
if err := watcher.allocator.ExpireAllocations(msg.Base.Timestamp); err != nil {
traceCtx := context.TODO()
if err := watcher.allocator.ExpireAllocations(traceCtx, msg.Base.Timestamp); err != nil {
log.Error("expire allocations error", zap.Error(err))
}
}
......@@ -76,12 +78,15 @@ func (watcher *dataNodeTimeTickWatcher) StartBackgroundLoop(ctx context.Context)
}
func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTickMsg) error {
segments, err := watcher.allocator.GetSealedSegments()
ctx := context.TODO()
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
segments, err := watcher.allocator.GetSealedSegments(ctx)
if err != nil {
return err
}
for _, id := range segments {
expired, err := watcher.allocator.IsAllocationsExpired(id, msg.Base.Timestamp)
expired, err := watcher.allocator.IsAllocationsExpired(ctx, id, msg.Base.Timestamp)
if err != nil {
log.Error("check allocations expired error", zap.Int64("segmentID", id), zap.Error(err))
continue
......@@ -106,7 +111,7 @@ func (watcher *dataNodeTimeTickWatcher) handleTimeTickMsg(msg *msgstream.TimeTic
CollectionID: segmentInfo.CollectionID,
SegmentIDs: []int64{segmentInfo.SegmentID},
})
watcher.allocator.DropSegment(id)
watcher.allocator.DropSegment(ctx, id)
}
}
return nil
......
package dataservice
import (
"context"
"strconv"
"testing"
"time"
......@@ -13,6 +14,7 @@ import (
)
func TestDataNodeTTWatcher(t *testing.T) {
ctx := context.Background()
Params.Init()
c := make(chan struct{})
cluster := newDataNodeCluster(c)
......@@ -56,10 +58,10 @@ func TestDataNodeTTWatcher(t *testing.T) {
assert.Nil(t, err)
err = meta.AddSegment(segmentInfo)
assert.Nil(t, err)
err = segAllocator.OpenSegment(segmentInfo)
err = segAllocator.OpenSegment(ctx, segmentInfo)
assert.Nil(t, err)
if c.allocation && c.expired {
_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
assert.Nil(t, err)
}
}
......@@ -67,11 +69,11 @@ func TestDataNodeTTWatcher(t *testing.T) {
time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
for i, c := range cases {
if c.allocation && !c.expired {
_, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100)
_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
assert.Nil(t, err)
}
if c.sealed {
err := segAllocator.SealSegment(segmentIDs[i])
err := segAllocator.SealSegment(ctx, segmentIDs[i])
assert.Nil(t, err)
}
}
......
......@@ -253,6 +253,26 @@ class TestIndexBase:
# assert index == indexs[-1]
assert not index # FLAT is the last index_type, drop all indexes in server
@pytest.mark.tags(CaseLabel.tags_0331)
@pytest.mark.level(2)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_different_index_repeatedly_B(self, connect, collection):
'''
target: check if index can be created repeatedly, with the different create_index params
method: create another index with different index_params after index have been built
expected: return code 0, and describe index result equals with the second index params
'''
ids = connect.insert(collection, default_entities)
connect.flush([collection])
indexs = [default_index, {"metric_type": "L2", "index_type": "IVF_SQ8", "params": {"nlist": 1024}}]
for index in indexs:
connect.create_index(collection, field_name, index)
connect.release_collection(collection)
connect.load_collection(collection)
index = connect.describe_index(collection, field_name)
assert index == indexs[-1]
# assert not index # FLAT is the last index_type, drop all indexes in server
@pytest.mark.tags(CaseLabel.tags_0331, CaseLabel.tags_l1, CaseLabel.tags_smoke)
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_ip(self, connect, collection, get_simple_index):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册