未验证 提交 01be9111 编写于 作者: B Bingyi Sun 提交者: GitHub

Add GetFlushState interface (#12170)

issue: #12075
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 2e8a3535
此差异已折叠。
......@@ -53,7 +53,7 @@ struct TableStruct_milvus_2eproto {
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[75]
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[77]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[];
static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[];
......@@ -156,6 +156,12 @@ extern GetCompactionStateRequestDefaultTypeInternal _GetCompactionStateRequest_d
class GetCompactionStateResponse;
class GetCompactionStateResponseDefaultTypeInternal;
extern GetCompactionStateResponseDefaultTypeInternal _GetCompactionStateResponse_default_instance_;
class GetFlushStateRequest;
class GetFlushStateRequestDefaultTypeInternal;
extern GetFlushStateRequestDefaultTypeInternal _GetFlushStateRequest_default_instance_;
class GetFlushStateResponse;
class GetFlushStateResponseDefaultTypeInternal;
extern GetFlushStateResponseDefaultTypeInternal _GetFlushStateResponse_default_instance_;
class GetIndexBuildProgressRequest;
class GetIndexBuildProgressRequestDefaultTypeInternal;
extern GetIndexBuildProgressRequestDefaultTypeInternal _GetIndexBuildProgressRequest_default_instance_;
......@@ -323,6 +329,8 @@ template<> ::milvus::proto::milvus::GetCompactionPlansRequest* Arena::CreateMayb
template<> ::milvus::proto::milvus::GetCompactionPlansResponse* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetCompactionPlansResponse>(Arena*);
template<> ::milvus::proto::milvus::GetCompactionStateRequest* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetCompactionStateRequest>(Arena*);
template<> ::milvus::proto::milvus::GetCompactionStateResponse* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetCompactionStateResponse>(Arena*);
template<> ::milvus::proto::milvus::GetFlushStateRequest* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetFlushStateRequest>(Arena*);
template<> ::milvus::proto::milvus::GetFlushStateResponse* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetFlushStateResponse>(Arena*);
template<> ::milvus::proto::milvus::GetIndexBuildProgressRequest* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetIndexBuildProgressRequest>(Arena*);
template<> ::milvus::proto::milvus::GetIndexBuildProgressResponse* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetIndexBuildProgressResponse>(Arena*);
template<> ::milvus::proto::milvus::GetIndexStateRequest* Arena::CreateMaybeMessage<::milvus::proto::milvus::GetIndexStateRequest>(Arena*);
......@@ -12787,6 +12795,285 @@ class CompactionMergeInfo :
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
// -------------------------------------------------------------------
class GetFlushStateRequest :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.milvus.GetFlushStateRequest) */ {
public:
GetFlushStateRequest();
virtual ~GetFlushStateRequest();
GetFlushStateRequest(const GetFlushStateRequest& from);
GetFlushStateRequest(GetFlushStateRequest&& from) noexcept
: GetFlushStateRequest() {
*this = ::std::move(from);
}
inline GetFlushStateRequest& operator=(const GetFlushStateRequest& from) {
CopyFrom(from);
return *this;
}
inline GetFlushStateRequest& operator=(GetFlushStateRequest&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const GetFlushStateRequest& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const GetFlushStateRequest* internal_default_instance() {
return reinterpret_cast<const GetFlushStateRequest*>(
&_GetFlushStateRequest_default_instance_);
}
static constexpr int kIndexInFileMessages =
75;
friend void swap(GetFlushStateRequest& a, GetFlushStateRequest& b) {
a.Swap(&b);
}
inline void Swap(GetFlushStateRequest* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline GetFlushStateRequest* New() const final {
return CreateMaybeMessage<GetFlushStateRequest>(nullptr);
}
GetFlushStateRequest* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<GetFlushStateRequest>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const GetFlushStateRequest& from);
void MergeFrom(const GetFlushStateRequest& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(GetFlushStateRequest* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.milvus.GetFlushStateRequest";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_milvus_2eproto);
return ::descriptor_table_milvus_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kSegmentIDsFieldNumber = 1,
};
// repeated int64 segmentIDs = 1;
int segmentids_size() const;
void clear_segmentids();
::PROTOBUF_NAMESPACE_ID::int64 segmentids(int index) const;
void set_segmentids(int index, ::PROTOBUF_NAMESPACE_ID::int64 value);
void add_segmentids(::PROTOBUF_NAMESPACE_ID::int64 value);
const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
segmentids() const;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
mutable_segmentids();
// @@protoc_insertion_point(class_scope:milvus.proto.milvus.GetFlushStateRequest)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 > segmentids_;
mutable std::atomic<int> _segmentids_cached_byte_size_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
// -------------------------------------------------------------------
class GetFlushStateResponse :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.milvus.GetFlushStateResponse) */ {
public:
GetFlushStateResponse();
virtual ~GetFlushStateResponse();
GetFlushStateResponse(const GetFlushStateResponse& from);
GetFlushStateResponse(GetFlushStateResponse&& from) noexcept
: GetFlushStateResponse() {
*this = ::std::move(from);
}
inline GetFlushStateResponse& operator=(const GetFlushStateResponse& from) {
CopyFrom(from);
return *this;
}
inline GetFlushStateResponse& operator=(GetFlushStateResponse&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const GetFlushStateResponse& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const GetFlushStateResponse* internal_default_instance() {
return reinterpret_cast<const GetFlushStateResponse*>(
&_GetFlushStateResponse_default_instance_);
}
static constexpr int kIndexInFileMessages =
76;
friend void swap(GetFlushStateResponse& a, GetFlushStateResponse& b) {
a.Swap(&b);
}
inline void Swap(GetFlushStateResponse* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline GetFlushStateResponse* New() const final {
return CreateMaybeMessage<GetFlushStateResponse>(nullptr);
}
GetFlushStateResponse* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<GetFlushStateResponse>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const GetFlushStateResponse& from);
void MergeFrom(const GetFlushStateResponse& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(GetFlushStateResponse* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.milvus.GetFlushStateResponse";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_milvus_2eproto);
return ::descriptor_table_milvus_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kStatusFieldNumber = 1,
kFlushedFieldNumber = 2,
};
// .milvus.proto.common.Status status = 1;
bool has_status() const;
void clear_status();
const ::milvus::proto::common::Status& status() const;
::milvus::proto::common::Status* release_status();
::milvus::proto::common::Status* mutable_status();
void set_allocated_status(::milvus::proto::common::Status* status);
// bool flushed = 2;
void clear_flushed();
bool flushed() const;
void set_flushed(bool value);
// @@protoc_insertion_point(class_scope:milvus.proto.milvus.GetFlushStateResponse)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::milvus::proto::common::Status* status_;
bool flushed_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_milvus_2eproto;
};
// ===================================================================
......@@ -23939,6 +24226,103 @@ inline void CompactionMergeInfo::set_target(::PROTOBUF_NAMESPACE_ID::int64 value
// @@protoc_insertion_point(field_set:milvus.proto.milvus.CompactionMergeInfo.target)
}
// -------------------------------------------------------------------
// GetFlushStateRequest
// repeated int64 segmentIDs = 1;
inline int GetFlushStateRequest::segmentids_size() const {
return segmentids_.size();
}
inline void GetFlushStateRequest::clear_segmentids() {
segmentids_.Clear();
}
inline ::PROTOBUF_NAMESPACE_ID::int64 GetFlushStateRequest::segmentids(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.milvus.GetFlushStateRequest.segmentIDs)
return segmentids_.Get(index);
}
inline void GetFlushStateRequest::set_segmentids(int index, ::PROTOBUF_NAMESPACE_ID::int64 value) {
segmentids_.Set(index, value);
// @@protoc_insertion_point(field_set:milvus.proto.milvus.GetFlushStateRequest.segmentIDs)
}
inline void GetFlushStateRequest::add_segmentids(::PROTOBUF_NAMESPACE_ID::int64 value) {
segmentids_.Add(value);
// @@protoc_insertion_point(field_add:milvus.proto.milvus.GetFlushStateRequest.segmentIDs)
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >&
GetFlushStateRequest::segmentids() const {
// @@protoc_insertion_point(field_list:milvus.proto.milvus.GetFlushStateRequest.segmentIDs)
return segmentids_;
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedField< ::PROTOBUF_NAMESPACE_ID::int64 >*
GetFlushStateRequest::mutable_segmentids() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.milvus.GetFlushStateRequest.segmentIDs)
return &segmentids_;
}
// -------------------------------------------------------------------
// GetFlushStateResponse
// .milvus.proto.common.Status status = 1;
inline bool GetFlushStateResponse::has_status() const {
return this != internal_default_instance() && status_ != nullptr;
}
inline const ::milvus::proto::common::Status& GetFlushStateResponse::status() const {
const ::milvus::proto::common::Status* p = status_;
// @@protoc_insertion_point(field_get:milvus.proto.milvus.GetFlushStateResponse.status)
return p != nullptr ? *p : *reinterpret_cast<const ::milvus::proto::common::Status*>(
&::milvus::proto::common::_Status_default_instance_);
}
inline ::milvus::proto::common::Status* GetFlushStateResponse::release_status() {
// @@protoc_insertion_point(field_release:milvus.proto.milvus.GetFlushStateResponse.status)
::milvus::proto::common::Status* temp = status_;
status_ = nullptr;
return temp;
}
inline ::milvus::proto::common::Status* GetFlushStateResponse::mutable_status() {
if (status_ == nullptr) {
auto* p = CreateMaybeMessage<::milvus::proto::common::Status>(GetArenaNoVirtual());
status_ = p;
}
// @@protoc_insertion_point(field_mutable:milvus.proto.milvus.GetFlushStateResponse.status)
return status_;
}
inline void GetFlushStateResponse::set_allocated_status(::milvus::proto::common::Status* status) {
::PROTOBUF_NAMESPACE_ID::Arena* message_arena = GetArenaNoVirtual();
if (message_arena == nullptr) {
delete reinterpret_cast< ::PROTOBUF_NAMESPACE_ID::MessageLite*>(status_);
}
if (status) {
::PROTOBUF_NAMESPACE_ID::Arena* submessage_arena = nullptr;
if (message_arena != submessage_arena) {
status = ::PROTOBUF_NAMESPACE_ID::internal::GetOwnedMessage(
message_arena, status, submessage_arena);
}
} else {
}
status_ = status;
// @@protoc_insertion_point(field_set_allocated:milvus.proto.milvus.GetFlushStateResponse.status)
}
// bool flushed = 2;
inline void GetFlushStateResponse::clear_flushed() {
flushed_ = false;
}
inline bool GetFlushStateResponse::flushed() const {
// @@protoc_insertion_point(field_get:milvus.proto.milvus.GetFlushStateResponse.flushed)
return flushed_;
}
inline void GetFlushStateResponse::set_flushed(bool value) {
flushed_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.milvus.GetFlushStateResponse.flushed)
}
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif // __GNUC__
......@@ -24090,6 +24474,10 @@ inline void CompactionMergeInfo::set_target(::PROTOBUF_NAMESPACE_ID::int64 value
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)
......
......@@ -1820,6 +1820,101 @@ func TestPostFlush(t *testing.T) {
})
}
func TestGetFlushState(t *testing.T) {
t.Run("get flush state with all flushed segments", func(t *testing.T) {
svr := &Server{
isServing: ServerStateHealthy,
meta: &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Flushed,
},
},
},
},
},
}
resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: true,
}, resp)
})
t.Run("get flush state with unflushed segments", func(t *testing.T) {
svr := &Server{
isServing: ServerStateHealthy,
meta: &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Sealed,
},
},
},
},
},
}
resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: false,
}, resp)
})
t.Run("get flush state with compacted segments", func(t *testing.T) {
svr := &Server{
isServing: ServerStateHealthy,
meta: &meta{
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Dropped,
},
},
},
},
},
}
resp, err := svr.GetFlushState(context.TODO(), &milvuspb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.Nil(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Flushed: true,
}, resp)
})
}
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.TimeTickChannelName = Params.TimeTickChannelName + strconv.Itoa(rand.Int())
......
......@@ -826,7 +826,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}
if s.isClosed() {
log.Warn("failed to watch channels request", zap.Any("channels", req.GetChannelNames()),
log.Warn("failed to watch channels request", zap.Any("channels", req.GetChannelNames()),
zap.Error(errDataCoordIsUnhealthy(Params.NodeID)))
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.NodeID)
return resp, nil
......@@ -847,3 +847,39 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
return resp, nil
}
// GetFlushState gets the flush state of multiple segments
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
log.Debug("received get flush state request", zap.Int64s("segment ids", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}}
if s.isClosed() {
log.Warn("failed to get flush state because of closed server",
zap.Int64s("segment ids", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs())))
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.NodeID)
return resp, nil
}
var unflushed []UniqueID
for _, sid := range req.GetSegmentIDs() {
segment := s.meta.GetSegment(sid)
// segment is nil if it was compacted
if segment == nil || segment.GetState() == commonpb.SegmentState_Flushed ||
segment.GetState() == commonpb.SegmentState_Flushed {
continue
}
unflushed = append(unflushed, sid)
}
if len(unflushed) != 0 {
log.Debug("unflushed segment ids", zap.Int64s("segment ids", unflushed), zap.Int("len", len(unflushed)))
resp.Flushed = false
} else {
log.Debug("all segment is flushed", zap.Int64s("segment ids", req.GetSegmentIDs()))
resp.Flushed = true
}
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
......@@ -645,3 +645,21 @@ func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
}
return ret.(*datapb.WatchChannelsResponse), err
}
// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
client, err := c.getGrpcClient()
if err != nil {
return nil, err
}
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.GetFlushState(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.GetFlushStateResponse), err
}
......@@ -113,7 +113,9 @@ func (m *MockDataCoordClient) GetCompactionStateWithPlans(ctx context.Context, r
func (m *MockDataCoordClient) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest, opts ...grpc.CallOption) (*datapb.WatchChannelsResponse, error) {
return &datapb.WatchChannelsResponse{}, m.err
}
func (m *MockDataCoordClient) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushStateResponse, error) {
return &milvuspb.GetFlushStateResponse{}, m.err
}
func Test_NewClient(t *testing.T) {
proxy.Params.InitOnce()
......
......@@ -286,3 +286,8 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
return s.dataCoord.WatchChannels(ctx, req)
}
// GetFlushState gets the flush state of multiple segments
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return s.dataCoord.GetFlushState(ctx, req)
}
......@@ -52,6 +52,7 @@ type MockDataCoord struct {
manualCompactionResp *milvuspb.ManualCompactionResponse
compactionPlansResp *milvuspb.GetCompactionPlansResponse
watchChannelsResp *datapb.WatchChannelsResponse
getFlushStateResp *milvuspb.GetFlushStateResponse
}
func (m *MockDataCoord) Init() error {
......@@ -150,6 +151,10 @@ func (m *MockDataCoord) WatchChannels(ctx context.Context, req *datapb.WatchChan
return m.watchChannelsResp, m.err
}
func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return m.getFlushStateResp, m.err
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
ctx := context.Background()
......@@ -307,6 +312,15 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, resp)
})
t.Run("GetFlushState", func(t *testing.T) {
server.dataCoord = &MockDataCoord{
getFlushStateResp: &milvuspb.GetFlushStateResponse{},
}
resp, err := server.GetFlushState(ctx, nil)
assert.Nil(t, err)
assert.NotNil(t, resp)
})
err = server.Stop()
assert.Nil(t, err)
}
......
......@@ -447,3 +447,7 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
return s.proxy.GetCompactionStateWithPlans(ctx, req)
}
func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return s.proxy.GetFlushState(ctx, req)
}
......@@ -399,6 +399,10 @@ func (m *MockDataCoord) WatchChannels(ctx context.Context, req *datapb.WatchChan
return nil, nil
}
func (m *MockDataCoord) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockProxy struct {
MockBase
......@@ -609,6 +613,10 @@ func (m *MockProxy) GetCompactionStateWithPlans(ctx context.Context, req *milvus
return nil, nil
}
func (m *MockProxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
ctx := context.Background()
......
......@@ -39,6 +39,7 @@ service DataCoord {
rpc GetCompactionStateWithPlans(milvus.GetCompactionPlansRequest) returns (milvus.GetCompactionPlansResponse) {}
rpc WatchChannels(WatchChannelsRequest) returns (WatchChannelsResponse) {}
rpc GetFlushState(milvus.GetFlushStateRequest) returns (milvus.GetFlushStateResponse) {}
}
service DataNode {
......
......@@ -764,6 +764,15 @@ message CompactionMergeInfo {
int64 target = 2;
}
message GetFlushStateRequest {
repeated int64 segmentIDs = 1;
}
message GetFlushStateResponse {
common.Status status = 1;
bool flushed = 2;
}
service ProxyService {
rpc RegisterLink(RegisterLinkRequest) returns (RegisterLinkResponse) {}
}
......@@ -198,6 +198,10 @@ func (coord *DataCoordMock) WatchChannels(ctx context.Context, req *datapb.Watch
return &datapb.WatchChannelsResponse{}, nil
}
func (coord *DataCoordMock) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
return &milvuspb.GetFlushStateResponse{}, nil
}
func NewDataCoordMock() *DataCoordMock {
return &DataCoordMock{
nodeID: typeutil.UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
......
......@@ -2537,6 +2537,21 @@ func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvusp
return resp, err
}
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
log.Info("received get flush state request", zap.Any("request", req))
resp := &milvuspb.GetFlushStateResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
log.Info("unable to get flush state because of closed server")
return resp, nil
}
resp, err := node.dataCoord.GetFlushState(ctx, req)
log.Info("received get flush state response", zap.Any("response", resp))
return resp, err
}
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
code := node.stateCode.Load().(internalpb.StateCode)
......
......@@ -2483,6 +2483,26 @@ func Test_GetCompactionStateWithPlans(t *testing.T) {
})
}
func Test_GetFlushState(t *testing.T) {
t.Run("normal test", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(internalpb.StateCode_Healthy)
resp, err := proxy.GetFlushState(context.TODO(), nil)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{}, resp)
assert.Nil(t, err)
})
t.Run("test get flush state with unhealthy proxy", func(t *testing.T) {
datacoord := &DataCoordMock{}
proxy := &Proxy{dataCoord: datacoord}
proxy.stateCode.Store(internalpb.StateCode_Abnormal)
resp, err := proxy.GetFlushState(context.TODO(), nil)
assert.EqualValues(t, unhealthyStatus(), resp.Status)
assert.Nil(t, err)
})
}
func TestProxy_GetComponentStates(t *testing.T) {
n := &Proxy{}
n.stateCode.Store(internalpb.StateCode_Healthy)
......
......@@ -230,6 +230,8 @@ type DataCoord interface {
GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error)
WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error)
// GetFlushState gets the flush state of multiple segments
GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
}
// IndexNode is the interface `indexnode` package implements
......@@ -958,6 +960,8 @@ type ProxyComponent interface {
GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error)
ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error)
GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error)
// GetFlushState gets the flush state of multiple segments
GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error)
}
// QueryNode is the interface `querynode` package implements
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册