未验证 提交 f31be941 编写于 作者: G groot 提交者: GitHub

Fix bulkload row count bug for multi datanodes (#17384)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 b5c11a21
......@@ -60,13 +60,22 @@ func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {
var _ allocator = (*FailsAllocator)(nil)
// FailsAllocator allocator that fails
type FailsAllocator struct{}
type FailsAllocator struct {
allocTsSucceed bool
allocIDSucceed bool
}
func (a *FailsAllocator) allocTimestamp(_ context.Context) (Timestamp, error) {
if a.allocTsSucceed {
return 0, nil
}
return 0, errors.New("always fail")
}
func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) {
if a.allocIDSucceed {
return 0, nil
}
return 0, errors.New("always fail")
}
......
......@@ -66,6 +66,8 @@ func putAllocation(a *Allocation) {
type Manager interface {
// AllocSegment allocates rows and record the allocation.
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
// AllocSegmentForImport allocates one segment allocation for bulkload.
AllocSegmentForImport(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments.
......@@ -248,7 +250,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return nil, err
}
for _, allocation := range newSegmentAllocations {
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName)
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Growing)
if err != nil {
return nil, err
}
......@@ -270,6 +272,31 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
return allocations, nil
}
// AllocSegmentForImport allocates one segment allocation for bulkload
func (s *SegmentManager) AllocSegmentForImport(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error) {
// init allocation
allocation := getAllocation(requestRows)
// create new segments and add allocations to meta
// to avoid mixing up with growing segments, the segment state is "Importing"
expireTs, err := s.genExpireTs(ctx)
if err != nil {
return nil, err
}
segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Importing)
if err != nil {
return nil, err
}
allocation.ExpireTime = expireTs
allocation.SegmentID = segment.GetID()
if err := s.meta.AddAllocation(segment.GetID(), allocation); err != nil {
return nil, err
}
return allocation, nil
}
func satisfy(segment *SegmentInfo, collectionID, partitionID UniqueID, channel string) bool {
return segment.GetCollectionID() == collectionID && segment.GetPartitionID() == partitionID &&
segment.GetInsertChannel() == channel
......@@ -290,7 +317,8 @@ func (s *SegmentManager) genExpireTs(ctx context.Context) (Timestamp, error) {
return expireTs, nil
}
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) {
func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
channelName string, segmentState commonpb.SegmentState) (*SegmentInfo, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
id, err := s.allocator.allocID(ctx)
......@@ -308,7 +336,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
PartitionID: partitionID,
InsertChannel: channelName,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
State: segmentState,
MaxRowNum: int64(maxNumOfRows),
LastExpireTime: 0,
}
......
......@@ -113,14 +113,66 @@ func TestAllocSegment(t *testing.T) {
assert.NotEqualValues(t, 0, allocations[0].ExpireTime)
})
t.Run("allocation fails", func(t *testing.T) {
failsAllocator := &FailsAllocator{}
t.Run("allocation fails 1", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c2", 100)
assert.NotNil(t, err)
})
t.Run("allocation fails 2", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
assert.NotNil(t, err)
})
}
func TestAllocSegmentForImport(t *testing.T) {
ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
schema := newTestSchema()
collID, err := mockAllocator.allocID(ctx)
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
t.Run("normal allocation", func(t *testing.T) {
allocation, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
assert.Nil(t, err)
assert.NotNil(t, allocation)
assert.EqualValues(t, 100, allocation.NumOfRows)
assert.NotEqualValues(t, 0, allocation.SegmentID)
assert.NotEqualValues(t, 0, allocation.ExpireTime)
})
t.Run("allocation fails 1", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocTsSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
assert.NotNil(t, err)
})
t.Run("allocation fails 2", func(t *testing.T) {
failsAllocator := &FailsAllocator{
allocIDSucceed: true,
}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegmentForImport(ctx, collID, 100, "c1", 100)
assert.NotNil(t, err)
})
}
func TestLoadSegmentsFromMeta(t *testing.T) {
ctx := context.Background()
Params.Init()
......
......@@ -108,6 +108,38 @@ func TestAssignSegmentID(t *testing.T) {
assert.EqualValues(t, 1000, assign.Count)
})
t.Run("assign segment for bulkload", func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: collID,
Schema: schema,
Partitions: []int64{},
})
req := &datapb.SegmentIDRequest{
Count: 1000,
ChannelName: channel0,
CollectionID: collID,
PartitionID: partID,
IsImport: true,
}
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
NodeID: 0,
PeerRole: "",
SegmentIDRequests: []*datapb.SegmentIDRequest{req},
})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
assign := resp.SegIDAssignments[0]
assert.EqualValues(t, commonpb.ErrorCode_Success, assign.Status.ErrorCode)
assert.EqualValues(t, collID, assign.CollectionID)
assert.EqualValues(t, partID, assign.PartitionID)
assert.EqualValues(t, channel0, assign.ChannelName)
assert.EqualValues(t, 1000, assign.Count)
})
t.Run("with closed server", func(t *testing.T) {
req := &datapb.SegmentIDRequest{
Count: 100,
......@@ -870,6 +902,10 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu
panic("not implemented") // TODO: Implement
}
func (s *spySegmentManager) AllocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) (*Allocation, error) {
panic("not implemented") // TODO: Implement
}
// DropSegment drops the segment from manager.
func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
......
......@@ -116,7 +116,8 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("collectionID", r.GetCollectionID()),
zap.Int64("partitionID", r.GetPartitionID()),
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()))
zap.Uint32("count", r.GetCount()),
zap.Bool("isImport", r.GetIsImport()))
// Load the collection info from Root Coordinator, if it is not found in server meta.
if s.meta.GetCollection(r.GetCollectionID()) == nil {
......@@ -130,16 +131,30 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
// Add the channel to cluster for watching.
s.cluster.Watch(r.ChannelName, r.CollectionID)
// Have segment manager allocate and return the segment allocation info.
segAlloc, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err))
continue
segmentAllocations := make([]*Allocation, 0)
if r.GetIsImport() {
// Have segment manager allocate and return the segment allocation info.
segAlloc, err := s.segmentManager.AllocSegmentForImport(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
log.Warn("failed to alloc segment for import", zap.Any("request", r), zap.Error(err))
continue
}
segmentAllocations = append(segmentAllocations, segAlloc)
} else {
// Have segment manager allocate and return the segment allocation info.
segAlloc, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err))
continue
}
segmentAllocations = append(segmentAllocations, segAlloc...)
}
log.Info("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segAlloc))
for _, allocation := range segAlloc {
log.Info("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segmentAllocations))
for _, allocation := range segmentAllocations {
result := &datapb.SegmentIDAssignment{
SegID: allocation.SegmentID,
ChannelName: r.ChannelName,
......
......@@ -964,13 +964,23 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
tr := timerecord.NewTimeRecorder("import callback function")
defer tr.Elapse("finished")
// use the first field's row count as segment row count
// all the fileds row count are same, checked by ImportWrapper
var rowNum int
for _, field := range fields {
rowNum = field.RowNum()
break
}
// ask DataCoord to alloc a new segment
log.Info("import task flush segment", zap.Any("ChannelNames", req.ImportTask.ChannelNames), zap.Int("shardNum", shardNum))
segReqs := []*datapb.SegmentIDRequest{
{
ChannelName: req.ImportTask.ChannelNames[shardNum],
Count: 1,
Count: uint32(rowNum),
CollectionID: req.GetImportTask().GetCollectionId(),
PartitionID: req.GetImportTask().GetPartitionId(),
IsImport: true,
},
}
segmentIDReq := &datapb.AssignSegmentIDRequest{
......@@ -990,11 +1000,6 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
segmentID := resp.SegIDAssignments[0].SegID
// TODO: this code block is long and tedious, maybe split it into separate functions.
var rowNum int
for _, field := range fields {
rowNum = field.RowNum()
break
}
tsFieldData := make([]int64, rowNum)
for i := range tsFieldData {
tsFieldData[i] = int64(ts)
......
......@@ -94,6 +94,7 @@ message SegmentIDRequest {
string channel_name = 2;
int64 collectionID = 3;
int64 partitionID = 4;
bool isImport = 5;
}
message AssignSegmentIDRequest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册