未验证 提交 f0b036a3 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Move bulk load segment lock happen early (#17612)

issue: #17600
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 68d38513
......@@ -498,9 +498,10 @@ func (m *mockRootCoordService) ListImportTasks(ctx context.Context, in *milvuspb
panic("not implemented") // TODO: Implement
}
// Report impot task state to rootcoord
func (m *mockRootCoordService) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
type mockCompactionHandler struct {
......
......@@ -18,6 +18,7 @@ package datacoord
import (
"context"
"errors"
"fmt"
"sync"
"time"
......@@ -25,6 +26,8 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
......@@ -66,8 +69,9 @@ func putAllocation(a *Allocation) {
type Manager interface {
// AllocSegment allocates rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
// AllocSegmentForImport allocates one segment allocation for bulkload.
AllocSegmentForImport(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error)
// allocSegmentForImport allocates one segment allocation for bulk load.
// TODO: Remove this method and AllocSegment() above instead.
allocSegmentForImport(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments.
......@@ -103,6 +107,7 @@ type SegmentManager struct {
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
rcc types.RootCoord
}
type allocHelper struct {
......@@ -185,7 +190,7 @@ func defaultFlushPolicy() flushPolicy {
}
// newSegmentManager should be the only way to retrieve SegmentManager.
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager {
func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opts ...allocOption) *SegmentManager {
manager := &SegmentManager{
meta: meta,
allocator: allocator,
......@@ -196,6 +201,7 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
rcc: rcc,
}
for _, opt := range opts {
opt.apply(manager)
......@@ -272,9 +278,9 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return allocations, nil
}
// AllocSegmentForImport allocates one segment allocation for bulkload
func (s *SegmentManager) AllocSegmentForImport(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error) {
// allocSegmentForImport allocates one segment allocation for bulk load.
func (s *SegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64, importTaskID int64) (*Allocation, error) {
// init allocation
allocation := getAllocation(requestRows)
......@@ -289,6 +295,32 @@ func (s *SegmentManager) AllocSegmentForImport(ctx context.Context, collectionID
if err != nil {
return nil, err
}
// ReportImport with the new segment so RootCoord can add segment ref lock onto it.
// TODO: This is a hack and will be removed once the whole ImportManager is migrated from RootCoord to DataCoord.
if s.rcc == nil {
log.Error("RootCoord client not set")
return nil, errors.New("RootCoord client not set")
}
status, err := s.rcc.ReportImport(context.Background(), &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: importTaskID,
DatanodeId: Params.DataNodeCfg.GetNodeID(),
State: commonpb.ImportState_ImportAllocSegment,
Segments: []int64{segment.GetID()},
})
if err != nil {
log.Error("failed to report import on new segment", zap.Error(err))
return nil, err
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to report import on new segment", zap.String("reason", status.GetReason()))
return nil, fmt.Errorf("failed to report import on new segment: %s", status.GetReason())
}
log.Info("successfully report import the new segment",
zap.Int64("segment ID", segment.GetID()))
allocation.ExpireTime = expireTs
allocation.SegmentID = segment.GetID()
if err := s.meta.AddAllocation(segment.GetID(), allocation); err != nil {
......@@ -323,10 +355,12 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
defer sp.Finish()
id, err := s.allocator.allocID(ctx)
if err != nil {
log.Error("failed to open new segment while allocID", zap.Error(err))
return nil, err
}
maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
log.Error("failed to open new segment while estimateMaxNumOfRows", zap.Error(err))
return nil, err
}
......@@ -342,6 +376,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
}
segment := NewSegmentInfo(segmentInfo)
if err := s.meta.AddSegment(segment); err != nil {
log.Error("failed to add segment to DataCoord", zap.Error(err))
return nil, err
}
s.segments = append(s.segments, id)
......
......@@ -36,7 +36,7 @@ func TestManagerOptions(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
t.Run("test with alloc helper", func(t *testing.T) {
opt := withAllocHelper(allocHelper{})
......@@ -97,7 +97,7 @@ func TestAllocSegment(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
......@@ -117,7 +117,7 @@ func TestAllocSegment(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
segmentManager := newSegmentManager(meta, failsAllocator, nil)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c2", 100)
assert.NotNil(t, err)
})
......@@ -126,7 +126,7 @@ func TestAllocSegment(t *testing.T) {
failsAllocator := &FailsAllocator{
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
segmentManager := newSegmentManager(meta, failsAllocator, nil)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
assert.NotNil(t, err)
})
......@@ -138,7 +138,8 @@ func TestAllocSegmentForImport(t *testing.T) {
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
ms := newMockRootCoordService()
segmentManager := newSegmentManager(meta, mockAllocator, ms)
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
......@@ -146,7 +147,7 @@ func TestAllocSegmentForImport(t *testing.T) {
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
t.Run("normal allocation", func(t *testing.T) {
allocation, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
allocation, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.Nil(t, err)
assert.NotNil(t, allocation)
assert.EqualValues(t, 100, allocation.NumOfRows)
......@@ -158,8 +159,8 @@ func TestAllocSegmentForImport(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
segmentManager := newSegmentManager(meta, failsAllocator, ms)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.NotNil(t, err)
})
......@@ -167,8 +168,14 @@ func TestAllocSegmentForImport(t *testing.T) {
failsAllocator := &FailsAllocator{
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
segmentManager := newSegmentManager(meta, failsAllocator, ms)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.NotNil(t, err)
})
t.Run("nil RootCoord", func(t *testing.T) {
segmentManager := newSegmentManager(meta, mockAllocator, nil)
_, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0)
assert.NotNil(t, err)
})
}
......@@ -219,7 +226,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) {
err = meta.AddSegment(NewSegmentInfo(flushedSegment))
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
segments := segmentManager.segments
assert.EqualValues(t, 2, len(segments))
}
......@@ -234,7 +241,7 @@ func TestSaveSegmentsToMeta(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -256,7 +263,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -278,7 +285,7 @@ func TestDropSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -305,7 +312,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) {
var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) {
return 1, nil
}
segmentManager := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy))
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 2, len(allocations))
......@@ -327,7 +334,7 @@ func TestExpireAllocation(t *testing.T) {
var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) {
return 10000000, nil
}
segmentManager := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy))
// alloc 100 times and expire
var maxts Timestamp
var id int64 = -1
......@@ -366,7 +373,7 @@ func TestGetFlushableSegments(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
segmentManager := newSegmentManager(meta, mockAllocator, nil)
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -412,7 +419,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -437,7 +444,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -462,7 +469,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator,
segmentManager := newSegmentManager(meta, mockAllocator, nil,
withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64)),
withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
......@@ -492,7 +499,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......@@ -518,7 +525,7 @@ func TestTryToSealSegment(t *testing.T) {
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
......
......@@ -469,7 +469,7 @@ func (s *Server) initServiceDiscovery() error {
func (s *Server) startSegmentManager() {
if s.segmentManager == nil {
s.segmentManager = newSegmentManager(s.meta, s.allocator)
s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient)
}
}
......
......@@ -995,7 +995,7 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu
panic("not implemented") // TODO: Implement
}
func (s *spySegmentManager) AllocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error) {
func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) {
panic("not implemented") // TODO: Implement
}
......
......@@ -117,7 +117,8 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("partitionID", r.GetPartitionID()),
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()),
zap.Bool("isImport", r.GetIsImport()))
zap.Bool("isImport", r.GetIsImport()),
zap.Int64("import task ID", r.GetImportTaskID()))
// Load the collection info from Root Coordinator, if it is not found in server meta.
if s.meta.GetCollection(r.GetCollectionID()) == nil {
......@@ -134,8 +135,8 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
segmentAllocations := make([]*Allocation, 0)
if r.GetIsImport() {
// Have segment manager allocate and return the segment allocation info.
segAlloc, err := s.segmentManager.AllocSegmentForImport(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
segAlloc, err := s.segmentManager.allocSegmentForImport(ctx,
r.GetCollectionID(), r.GetPartitionID(), r.GetChannelName(), int64(r.GetCount()), r.GetImportTaskID())
if err != nil {
log.Warn("failed to alloc segment for import", zap.Any("request", r), zap.Error(err))
continue
......
......@@ -1127,26 +1127,6 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}
// ReportImport with the new segment so RootCoord can add segment ref lock onto it.
// Fail-open.
status, err := node.rootCoord.ReportImport(context.Background(), &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
TaskId: req.GetImportTask().TaskId,
DatanodeId: Params.DataNodeCfg.GetNodeID(),
State: commonpb.ImportState_ImportAllocSegment,
Segments: []int64{segmentID},
})
if err != nil {
log.Error("failed to report import on new segment", zap.Error(err))
return err
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to report import on new segment", zap.String("reason", status.GetReason()))
return fmt.Errorf("failed to report import on new segment: %s", status.GetReason())
}
log.Info("now adding segment to the correct DataNode flow graph")
// Ask DataCoord to add segment to the corresponding DataNode flow graph.
node.dataCoord.AddSegment(context.Background(), &datapb.AddSegmentRequest{
......
......@@ -94,7 +94,8 @@ message SegmentIDRequest {
string channel_name = 2;
int64 collectionID = 3;
int64 partitionID = 4;
bool isImport = 5;
bool isImport = 5; // Indicate whether this request comes from a bulk load task.
int64 importTaskID = 6; // Needed for segment lock.
}
message AssignSegmentIDRequest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册