提交 5d586063 编写于 作者: S sunby 提交者: yefu.chen

Move Status to SegIDAssignment

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 a8b78f11
......@@ -10,8 +10,6 @@ import (
"github.com/cznic/mathutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
......@@ -174,10 +172,10 @@ func (sa *SegIDAssigner) syncSegments() {
fmt.Println("OOOOO", req.PerChannelReq)
resp, err := sa.masterClient.AssignSegmentID(ctx, req)
if resp.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS {
log.Println("GRPC AssignSegmentID Failed", resp, err)
return
}
//if resp.Status.GetErrorCode() != commonpb.ErrorCode_SUCCESS {
// log.Println("GRPC AssignSegmentID Failed", resp, err)
// return
//}
now := time.Now()
expiredTime := now.Add(time.Millisecond * time.Duration(1000))
......
......@@ -437,17 +437,8 @@ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*i
}
func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.AssignSegIDRequest) (*internalpb.AssignSegIDResponse, error) {
segInfos, err := s.segmentManager.AssignSegment(request.GetPerChannelReq())
if err != nil {
return &internalpb.AssignSegIDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
segInfos, _ := s.segmentManager.AssignSegment(request.GetPerChannelReq())
return &internalpb.AssignSegIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
},
PerChannelAssignment: segInfos,
}, nil
}
......
......@@ -3,8 +3,11 @@ package master
import (
"context"
"log"
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
......@@ -58,6 +61,11 @@ func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest
res := make([]*internalpb.SegIDAssignment, 0)
for _, req := range segIDReq {
result := &internalpb.SegIDAssignment{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
},
}
collName := req.CollName
partitionTag := req.PartitionTag
count := req.Count
......@@ -65,22 +73,27 @@ func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest
collMeta, err := manager.metaTable.GetCollectionByName(collName)
if err != nil {
return nil, err
result.Status.Reason = err.Error()
res = append(res, result)
continue
}
collID := collMeta.GetID()
if !manager.metaTable.HasPartition(collID, partitionTag) {
return nil, errors.Errorf("partition tag %s can not find in coll %d", partitionTag, collID)
result.Status.Reason = "partition tag " + partitionTag + " can not find in coll " + strconv.FormatInt(collID, 10)
res = append(res, result)
continue
}
assignInfo, err := manager.assignSegment(collName, collID, partitionTag, count, channelID)
if err != nil {
return nil, err
result.Status.Reason = err.Error()
res = append(res, result)
continue
}
res = append(res, assignInfo)
}
return res, nil
}
......@@ -125,6 +138,10 @@ func (manager *SegmentManager) assignSegment(
CollName: collName,
PartitionTag: partitionTag,
ExpireTime: result.expireTime,
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
},
}, nil
}
......@@ -156,6 +173,10 @@ func (manager *SegmentManager) assignSegment(
CollName: collName,
PartitionTag: partitionTag,
ExpireTime: result.expireTime,
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
},
}, nil
}
......
......@@ -112,13 +112,13 @@ func TestSegmentManager_AssignSegment(t *testing.T) {
var results = make([]*internalpb.SegIDAssignment, 0)
for _, c := range cases {
result, err := segManager.AssignSegment([]*internalpb.SegIDRequest{{Count: c.Count, ChannelID: c.ChannelID, CollName: collName, PartitionTag: partitionTag}})
result, _ := segManager.AssignSegment([]*internalpb.SegIDRequest{{Count: c.Count, ChannelID: c.ChannelID, CollName: collName, PartitionTag: partitionTag}})
results = append(results, result...)
if c.Err {
assert.NotNil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UNEXPECTED_ERROR, result[0].Status.ErrorCode)
continue
}
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, result[0].Status.ErrorCode)
if c.SameIDWith != -1 {
assert.EqualValues(t, result[0].SegID, results[c.SameIDWith].SegID)
}
......@@ -257,9 +257,9 @@ func TestSegmentManager_RPC(t *testing.T) {
},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, resp.Status.ErrorCode)
assignments := resp.GetPerChannelAssignment()
assert.EqualValues(t, 1, len(assignments))
assert.EqualValues(t, commonpb.ErrorCode_SUCCESS, assignments[0].Status.ErrorCode)
assert.EqualValues(t, collName, assignments[0].CollName)
assert.EqualValues(t, partitionTag, assignments[0].PartitionTag)
assert.EqualValues(t, int32(0), assignments[0].ChannelID)
......
......@@ -100,11 +100,11 @@ message SegIDAssignment {
string coll_name = 4;
string partition_tag = 5;
uint64 expire_time = 6;
common.Status status = 7;
}
message AssignSegIDResponse {
common.Status status = 1;
repeated SegIDAssignment per_channel_assignment = 2;
repeated SegIDAssignment per_channel_assignment = 1;
}
message CreateCollectionRequest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册