未验证 提交 c3f0c5a3 编写于 作者: B Bingyi Sun 提交者: GitHub

Add compaction interfaces in proxy (#11431)

issue: #9904
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 0eb71f7e
......@@ -169,7 +169,7 @@ static void InitDefaultsscc_info_Status_common_2eproto() {
{{ATOMIC_VAR_INIT(::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase::kUninitialized), 0, InitDefaultsscc_info_Status_common_2eproto}, {}};
static ::PROTOBUF_NAMESPACE_ID::Metadata file_level_metadata_common_2eproto[8];
static const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* file_level_enum_descriptors_common_2eproto[5];
static const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* file_level_enum_descriptors_common_2eproto[6];
static constexpr ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor const** file_level_service_descriptors_common_2eproto = nullptr;
const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_common_2eproto::offsets[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
......@@ -314,9 +314,10 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSegment\020\265"
"\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020SegmentFlus"
"hDone\020\267\t\022\017\n\nDataNodeTt\020\270\t*\"\n\007DslType\022\007\n\003"
"Dsl\020\000\022\016\n\nBoolExprV1\020\001B5Z3github.com/milv"
"us-io/milvus/internal/proto/commonpbb\006pr"
"oto3"
"Dsl\020\000\022\016\n\nBoolExprV1\020\001*B\n\017CompactionState"
"\022\021\n\rUndefiedState\020\000\022\r\n\tExecuting\020\001\022\r\n\tCo"
"mpleted\020\002B5Z3github.com/milvus-io/milvus"
"/internal/proto/commonpbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
......@@ -333,7 +334,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2524,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2592,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
......@@ -500,6 +501,21 @@ bool DslType_IsValid(int value) {
}
}
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* CompactionState_descriptor() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&descriptor_table_common_2eproto);
return file_level_enum_descriptors_common_2eproto[5];
}
bool CompactionState_IsValid(int value) {
switch (value) {
case 0:
case 1:
case 2:
return true;
default:
return false;
}
}
// ===================================================================
......
......@@ -310,6 +310,32 @@ inline bool DslType_Parse(
return ::PROTOBUF_NAMESPACE_ID::internal::ParseNamedEnum<DslType>(
DslType_descriptor(), name, value);
}
enum CompactionState : int {
UndefiedState = 0,
Executing = 1,
Completed = 2,
CompactionState_INT_MIN_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::min(),
CompactionState_INT_MAX_SENTINEL_DO_NOT_USE_ = std::numeric_limits<::PROTOBUF_NAMESPACE_ID::int32>::max()
};
bool CompactionState_IsValid(int value);
constexpr CompactionState CompactionState_MIN = UndefiedState;
constexpr CompactionState CompactionState_MAX = Completed;
constexpr int CompactionState_ARRAYSIZE = CompactionState_MAX + 1;
const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* CompactionState_descriptor();
template<typename T>
inline const std::string& CompactionState_Name(T enum_t_value) {
static_assert(::std::is_same<T, CompactionState>::value ||
::std::is_integral<T>::value,
"Incorrect type passed to function CompactionState_Name.");
return ::PROTOBUF_NAMESPACE_ID::internal::NameOfEnum(
CompactionState_descriptor(), enum_t_value);
}
inline bool CompactionState_Parse(
const std::string& name, CompactionState* value) {
return ::PROTOBUF_NAMESPACE_ID::internal::ParseNamedEnum<CompactionState>(
CompactionState_descriptor(), name, value);
}
// ===================================================================
class Status :
......@@ -2152,6 +2178,11 @@ template <>
inline const EnumDescriptor* GetEnumDescriptor< ::milvus::proto::common::DslType>() {
return ::milvus::proto::common::DslType_descriptor();
}
template <> struct is_proto_enum< ::milvus::proto::common::CompactionState> : ::std::true_type {};
template <>
inline const EnumDescriptor* GetEnumDescriptor< ::milvus::proto::common::CompactionState>() {
return ::milvus::proto::common::CompactionState_descriptor();
}
PROTOBUF_NAMESPACE_CLOSE
......
此差异已折叠。
此差异已折叠。
......@@ -33,8 +33,8 @@ type compactionPlanContext interface {
expireCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction by signal id and return the number of executing/completed/timeout plans
getCompactionBySignalID(signalID int64) (executing, completed, timeout int)
// get compaction tasks by signal id
getCompactionTasksBySignalID(signalID int64) []*compactionTask
}
type compactionTaskState int8
......@@ -55,6 +55,7 @@ type compactionTask struct {
plan *datapb.CompactionPlan
state compactionTaskState
dataNodeID int64
result *datapb.CompactionResult
}
func (t *compactionTask) shadowClone(opts ...compactionTaskOpt) *compactionTask {
......@@ -188,6 +189,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
return errors.New("unknown compaction type")
}
c.plans[planID] = c.plans[planID].shadowClone(setState(completed))
c.plans[planID] = c.plans[planID].shadowClone(setResult(result))
c.executingTaskNum--
if c.plans[planID].plan.GetType() == datapb.CompactionType_MergeCompaction {
c.flushCh <- result.GetSegmentID()
......@@ -258,25 +260,19 @@ func (c *compactionPlanHandler) getExecutingCompactions() []*compactionTask {
return tasks
}
// get compaction by signal id and return the number of executing/completed/timeout plans
func (c *compactionPlanHandler) getCompactionBySignalID(signalID int64) (executingPlans int, completedPlans int, timeoutPlans int) {
// get compaction tasks by signal id
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
var tasks []*compactionTask
for _, t := range c.plans {
if t.triggerInfo.id != signalID {
continue
}
switch t.state {
case executing:
executingPlans++
case completed:
completedPlans++
case timeout:
timeoutPlans++
}
tasks = append(tasks, t)
}
return
return tasks
}
type compactionTaskOpt func(task *compactionTask)
......@@ -286,3 +282,9 @@ func setState(state compactionTaskState) compactionTaskOpt {
task.state = state
}
}
func setResult(result *datapb.CompactionResult) compactionTaskOpt {
return func(task *compactionTask) {
task.result = result
}
}
......@@ -365,3 +365,63 @@ func Test_newCompactionPlanHandler(t *testing.T) {
})
}
}
func Test_getCompactionTasksBySignalID(t *testing.T) {
type fields struct {
plans map[int64]*compactionTask
}
type args struct {
signalID int64
}
tests := []struct {
name string
fields fields
args args
want []*compactionTask
}{
{
"test get compaction tasks",
fields{
plans: map[int64]*compactionTask{
1: {
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
2: {
triggerInfo: &compactionSignal{id: 1},
state: completed,
},
3: {
triggerInfo: &compactionSignal{id: 1},
state: timeout,
},
},
},
args{1},
[]*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
{
triggerInfo: &compactionSignal{id: 1},
state: completed,
},
{
triggerInfo: &compactionSignal{id: 1},
state: timeout,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &compactionPlanHandler{
plans: tt.fields.plans,
}
got := h.getCompactionTasksBySignalID(tt.args.signalID)
assert.ElementsMatch(t, tt.want, got)
})
}
}
......@@ -42,8 +42,8 @@ func (h *spyCompactionHandler) isFull() bool {
return false
}
// get compaction by signal id and return the number of executing/completed/timeout plans
func (h *spyCompactionHandler) getCompactionBySignalID(signalID int64) (executing int, completed int, timeout int) {
// get compaction tasks by signal id
func (h *spyCompactionHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
panic("not implemented") // TODO: Implement
}
......
......@@ -521,10 +521,10 @@ func (h *mockCompactionHandler) isFull() bool {
panic("not implemented")
}
// get compaction by signal id and return the number of executing/completed/timeout plans
func (h *mockCompactionHandler) getCompactionBySignalID(signalID int64) (executing int, completed int, timeout int) {
if f, ok := h.methods["getCompactionBySignalID"]; ok {
if ff, ok := f.(func(signalID int64) (executing int, completed int, timeout int)); ok {
// get compaction tasks by signal id
func (h *mockCompactionHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if f, ok := h.methods["getCompactionTasksBySignalID"]; ok {
if ff, ok := f.(func(signalID int64) []*compactionTask); ok {
return ff(signalID)
}
}
......
......@@ -1312,16 +1312,18 @@ func TestGetCompactionState(t *testing.T) {
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionBySignalID": func(signalID int64) (executing, completed, timeout int) {
return 0, 1, 0
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{state: completed},
}
},
},
}
resp, err := svr.GetCompactionState(context.Background(), &datapb.GetCompactionStateRequest{})
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, datapb.CompactionState_Completed, resp.GetState())
assert.Equal(t, commonpb.CompactionState_Completed, resp.GetState())
})
t.Run("test get compaction state in running", func(t *testing.T) {
svr := &Server{}
......@@ -1329,16 +1331,23 @@ func TestGetCompactionState(t *testing.T) {
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionBySignalID": func(signalID int64) (executing, completed, timeout int) {
return 3, 2, 1
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{state: executing},
{state: executing},
{state: executing},
{state: completed},
{state: completed},
{state: timeout},
}
},
},
}
resp, err := svr.GetCompactionState(context.Background(), &datapb.GetCompactionStateRequest{CompactionID: 1})
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{CompactionID: 1})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, datapb.CompactionState_Executing, resp.GetState())
assert.Equal(t, commonpb.CompactionState_Executing, resp.GetState())
assert.EqualValues(t, 3, resp.GetExecutingPlanNo())
assert.EqualValues(t, 2, resp.GetCompletedPlanNo())
assert.EqualValues(t, 1, resp.GetTimeoutPlanNo())
......@@ -1348,7 +1357,7 @@ func TestGetCompactionState(t *testing.T) {
svr := &Server{}
svr.isServing = ServerStateStopped
resp, err := svr.GetCompactionState(context.Background(), &datapb.GetCompactionStateRequest{})
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
assert.Equal(t, msgDataCoordIsUnhealthy(Params.NodeID), resp.GetStatus().GetReason())
......@@ -1412,7 +1421,7 @@ func TestManualCompaction(t *testing.T) {
},
}
resp, err := svr.ManualCompaction(context.TODO(), &datapb.ManualCompactionRequest{
resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: 1,
Timetravel: 1,
})
......@@ -1431,7 +1440,7 @@ func TestManualCompaction(t *testing.T) {
},
}
resp, err := svr.ManualCompaction(context.TODO(), &datapb.ManualCompactionRequest{
resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: 1,
Timetravel: 1,
})
......@@ -1450,7 +1459,7 @@ func TestManualCompaction(t *testing.T) {
},
}
resp, err := svr.ManualCompaction(context.TODO(), &datapb.ManualCompactionRequest{
resp, err := svr.ManualCompaction(context.TODO(), &milvuspb.ManualCompactionRequest{
CollectionID: 1,
Timetravel: 1,
})
......@@ -1460,6 +1469,56 @@ func TestManualCompaction(t *testing.T) {
})
}
func TestGetCompactionStateWithPlans(t *testing.T) {
t.Run("test get compaction state successfully", func(t *testing.T) {
svr := &Server{}
svr.isServing = ServerStateHealthy
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
}
},
},
}
resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{
CompactionID: 1,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, commonpb.CompactionState_Executing, resp.State)
})
t.Run("test get compaction state with closed server", func(t *testing.T) {
svr := &Server{}
svr.isServing = ServerStateStopped
svr.compactionHandler = &mockCompactionHandler{
methods: map[string]interface{}{
"getCompactionTasksBySignalID": func(signalID int64) []*compactionTask {
return []*compactionTask{
{
triggerInfo: &compactionSignal{id: 1},
state: executing,
},
}
},
},
}
resp, err := svr.GetCompactionStateWithPlans(context.TODO(), &milvuspb.GetCompactionPlansRequest{
CompactionID: 1,
})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
assert.Equal(t, msgDataCoordIsUnhealthy(Params.NodeID), resp.Status.Reason)
})
}
func TestOptions(t *testing.T) {
t.Run("SetRootCoordCreator", func(t *testing.T) {
svr := newTestServer(t, nil)
......
......@@ -645,10 +645,10 @@ func (s *Server) CompleteCompaction(ctx context.Context, req *datapb.CompactionR
return resp, nil
}
func (s *Server) ManualCompaction(ctx context.Context, req *datapb.ManualCompactionRequest) (*datapb.ManualCompactionResponse, error) {
func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
log.Debug("receive manual compaction", zap.Int64("collectionID", req.GetCollectionID()))
resp := &datapb.ManualCompactionResponse{
resp := &milvuspb.ManualCompactionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
......@@ -679,9 +679,9 @@ func (s *Server) ManualCompaction(ctx context.Context, req *datapb.ManualCompact
return resp, nil
}
func (s *Server) GetCompactionState(ctx context.Context, req *datapb.GetCompactionStateRequest) (*datapb.GetCompactionStateResponse, error) {
func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
log.Debug("receive get compaction state request", zap.Int64("compactionID", req.GetCompactionID()))
resp := &datapb.GetCompactionStateResponse{
resp := &milvuspb.GetCompactionStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
......@@ -699,16 +699,80 @@ func (s *Server) GetCompactionState(ctx context.Context, req *datapb.GetCompacti
return resp, nil
}
executing, completed, timeout := s.compactionHandler.getCompactionBySignalID(req.GetCompactionID())
if executing != 0 {
resp.State = datapb.CompactionState_Executing
} else {
resp.State = datapb.CompactionState_Completed
tasks := s.compactionHandler.getCompactionTasksBySignalID(req.GetCompactionID())
state, executingCnt, completedCnt, timeoutCnt := getCompactionState(tasks)
resp.State = state
resp.ExecutingPlanNo = int64(executingCnt)
resp.CompletedPlanNo = int64(completedCnt)
resp.TimeoutPlanNo = int64(timeoutCnt)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
log.Debug("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
resp := &milvuspb.GetCompactionPlansResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError},
}
if s.isClosed() {
log.Warn("failed to get compaction state with plans", zap.Int64("compactionID", req.GetCompactionID()), zap.Error(errDataCoordIsUnhealthy(Params.NodeID)))
resp.Status.Reason = msgDataCoordIsUnhealthy(Params.NodeID)
return resp, nil
}
if !Params.EnableCompaction {
resp.Status.Reason = "compaction disabled"
return resp, nil
}
resp.ExecutingPlanNo = int64(executing)
resp.CompletedPlanNo = int64(completed)
resp.TimeoutPlanNo = int64(timeout)
tasks := s.compactionHandler.getCompactionTasksBySignalID(req.GetCompactionID())
for _, task := range tasks {
resp.MergeInfos = append(resp.MergeInfos, getCompactionMergeInfo(task))
}
state, _, _, _ := getCompactionState(tasks)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.State = state
return resp, nil
}
func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo {
segments := task.plan.GetSegmentBinlogs()
var sources []int64
for _, s := range segments {
sources = append(sources, s.GetSegmentID())
}
var target int64 = -1
if task.result != nil {
target = task.result.GetSegmentID()
}
return &milvuspb.CompactionMergeInfo{
Sources: sources,
Target: target,
}
}
func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, timeoutCnt int) {
for _, t := range tasks {
switch t.state {
case executing:
executingCnt++
case completed:
completedCnt++
case timeout:
timeoutCnt++
}
}
if executingCnt != 0 {
state = commonpb.CompactionState_Executing
} else {
state = commonpb.CompactionState_Completed
}
return
}
......@@ -267,10 +267,14 @@ func (s *Server) CompleteCompaction(ctx context.Context, req *datapb.CompactionR
return s.dataCoord.CompleteCompaction(ctx, req)
}
func (s *Server) ManualCompaction(ctx context.Context, req *datapb.ManualCompactionRequest) (*datapb.ManualCompactionResponse, error) {
func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
return s.dataCoord.ManualCompaction(ctx, req)
}
func (s *Server) GetCompactionState(ctx context.Context, req *datapb.GetCompactionStateRequest) (*datapb.GetCompactionStateResponse, error) {
func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
return s.dataCoord.GetCompactionState(ctx, req)
}
func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
return s.dataCoord.GetCompactionStateWithPlans(ctx, req)
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册