未验证 提交 7746a5b7 编写于 作者: Y yah01 提交者: GitHub

Add NodeIds field for QuerySegmentInfo (#17121)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 90ee23df
此差异已折叠。
......@@ -10539,6 +10539,7 @@ class QuerySegmentInfo :
// accessors -------------------------------------------------------
enum : int {
kNodeIdsFieldNumber = 10,
kIndexNameFieldNumber = 6,
kSegmentIDFieldNumber = 1,
kCollectionIDFieldNumber = 2,
......@@ -10549,6 +10550,17 @@ class QuerySegmentInfo :
kNodeIDFieldNumber = 8,
kStateFieldNumber = 9,
};
// repeated int64 nodeIds = 10;
int nodeids_size() const;
void clear_nodeids();
::PROTOBUF_NAMESPACE_ID::int64 nodeids(int index) const;
void set_nodeids(int index, ::PROTOBUF_NAMESPACE_ID::int64 value);
void add_nodeids(::PROTOBUF_NAMESPACE_ID::int64 value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
nodeids() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
mutable_nodeids();
// string index_name = 6;
void clear_index_name();
const std::string& index_name() const;
......@@ -10605,6 +10617,8 @@ class QuerySegmentInfo :
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 > nodeids_;
mutable std::atomic<int> _nodeids_cached_byte_size_;
::PROTOBUF_NAMESPACE_ID::internal::ArenaStringPtr index_name_;
::PROTOBUF_NAMESPACE_ID::int64 segmentid_;
::PROTOBUF_NAMESPACE_ID::int64 collectionid_;
......@@ -25863,6 +25877,36 @@ inline void QuerySegmentInfo::set_state(::milvus::proto::common::SegmentState va
// @@protoc_insertion_point(field_set:milvus.proto.milvus.QuerySegmentInfo.state)
}
// repeated int64 nodeIds = 10;
inline int QuerySegmentInfo::nodeids_size() const {
return nodeids_.size();
}
inline void QuerySegmentInfo::clear_nodeids() {
nodeids_.Clear();
}
inline ::PROTOBUF_NAMESPACE_ID::int64 QuerySegmentInfo::nodeids(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.milvus.QuerySegmentInfo.nodeIds)
return nodeids_.Get(index);
}
inline void QuerySegmentInfo::set_nodeids(int index, ::PROTOBUF_NAMESPACE_ID::int64 value) {
nodeids_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.proto.milvus.QuerySegmentInfo.nodeIds)
}
inline void QuerySegmentInfo::add_nodeids(::PROTOBUF_NAMESPACE_ID::int64 value) {
nodeids_.Add(value);
// @@protoc_insertion_point(field_add:milvus.proto.milvus.QuerySegmentInfo.nodeIds)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
QuerySegmentInfo::nodeids() const {
// @@protoc_insertion_point(field_list:milvus.proto.milvus.QuerySegmentInfo.nodeIds)
return nodeids_;
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
QuerySegmentInfo::mutable_nodeids() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.milvus.QuerySegmentInfo.nodeIds)
return &nodeids_;
}
// -------------------------------------------------------------------
// GetQuerySegmentInfoRequest
......@@ -708,8 +708,10 @@ message QuerySegmentInfo {
int64 num_rows = 5;
string index_name = 6;
int64 indexID = 7;
// deprecated, check node_ids(NodeIds) field
int64 nodeID = 8;
common.SegmentState state = 9;
repeated int64 nodeIds = 10;
}
message GetQuerySegmentInfoRequest {
......
......@@ -354,6 +354,7 @@ message SegmentInfo {
int64 segmentID = 1;
int64 collectionID = 2;
int64 partitionID = 3;
// deprecated, check node_ids(NodeIds) field
int64 nodeID = 4;
int64 mem_size = 5;
int64 num_rows = 6;
......
......@@ -2471,9 +2471,10 @@ func (m *PartitionStates) GetInMemoryPercentage() int64 {
}
type SegmentInfo struct {
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"`
CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
// deprecated, check node_ids(NodeIds) field
NodeID int64 `protobuf:"varint,4,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
MemSize int64 `protobuf:"varint,5,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"`
NumRows int64 `protobuf:"varint,6,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
......
......@@ -3594,8 +3594,8 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
MemSize: info.MemSize,
IndexName: info.IndexName,
IndexID: info.IndexID,
NodeID: info.NodeID,
State: info.SegmentState,
NodeIds: info.NodeIds,
}
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
......
......@@ -2296,12 +2296,6 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
offlineNodes.Insert(nodeID)
}
log.Debug("removing offline nodes from replicas and segments...",
zap.Int("len(replicas)", len(replicas)),
zap.Int("len(segments)", len(segments)),
zap.Int64("trigger task ID", lbt.getTaskID()),
)
for _, replica := range replicas {
replica := replica
wg.Go(func() error {
......@@ -2344,6 +2338,11 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
return err
}
log.Info("remove offline nodes from segment",
zap.Int64("taskID", lbt.getTaskID()),
zap.Int64("segmentID", segment.GetSegmentID()),
zap.Int64s("nodeIds", segment.GetNodeIds()))
return nil
})
}
......
......@@ -967,6 +967,10 @@ func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta)
}
}
}
log.Info("update segment info",
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
zap.Any("segment", segmentInfosToSave))
sealedSegmentChangeInfos, err = meta.saveGlobalSealedSegInfos(segmentInfosToSave)
}
......
......@@ -1516,6 +1516,7 @@ class TestUtilityAdvanced(TestcaseBase):
assert cnt == nb
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_normal(self):
"""
target: test load balance of collection
......@@ -1557,6 +1558,7 @@ class TestUtilityAdvanced(TestcaseBase):
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_with_src_node_not_exist(self):
"""
target: test load balance of collection
......@@ -1593,6 +1595,7 @@ class TestUtilityAdvanced(TestcaseBase):
check_items={ct.err_code: 1, ct.err_msg: "is not exist to balance"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_with_all_dst_node_not_exist(self):
"""
target: test load balance of collection
......@@ -1628,6 +1631,7 @@ class TestUtilityAdvanced(TestcaseBase):
check_items={ct.err_code: 1, ct.err_msg: "no available queryNode to allocate"})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_with_one_sealed_segment_id_not_exist(self):
"""
target: test load balance of collection
......@@ -1668,6 +1672,7 @@ class TestUtilityAdvanced(TestcaseBase):
check_items={ct.err_code: 1, ct.err_msg: "is not exist"})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_in_one_group(self):
"""
target: test load balance of collection in one group
......@@ -1715,6 +1720,7 @@ class TestUtilityAdvanced(TestcaseBase):
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.xfail(reason="need newer SDK")
def test_load_balance_not_in_one_group(self):
"""
target: test load balance of collection in one group
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册