未验证 提交 de6e4817 编写于 作者: G Gao 提交者: GitHub

Support dynamic tuning config (#25152)

Signed-off-by: Nchasingegg <chao.gao@zilliz.com>
上级 6c017c59
......@@ -284,6 +284,7 @@ func (node *QueryNode) optimizeSearchParams(ctx context.Context, req *querypb.Se
common.SearchParamKey: queryInfo.GetSearchParams(),
common.SegmentNumKey: estSegmentNum,
common.WithFilterKey: withFilter,
common.CollectionKey: req.GetReq().GetCollectionID(),
}
err := node.queryHook.Run(params)
if err != nil {
......
......@@ -6,77 +6,193 @@ type MockQueryHook struct {
mock.Mock
}
type MockQueryHookExpecter struct {
type MockQueryHook_Expecter struct {
mock *mock.Mock
}
func (_m *MockQueryHook) EXPECT() *MockQueryHookExpecter {
return &MockQueryHookExpecter{mock: &_m.Mock}
func (_m *MockQueryHook) EXPECT() *MockQueryHook_Expecter {
return &MockQueryHook_Expecter{mock: &_m.Mock}
}
func (_m *MockQueryHook) Run(params map[string]any) error {
ret := _m.Called(params)
// DeleteTuningConfig provides a mock function with given fields: _a0
func (_m *MockQueryHook) DeleteTuningConfig(_a0 string) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(params map[string]any) error); ok {
r0 = rf(params)
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// Run is a helper method to define mock.On call
func (_e *MockQueryHookExpecter) Run(params any) *MockQueryHookRunCall {
return &MockQueryHookRunCall{Call: _e.mock.On("Run", params)}
// MockQueryHook_DeleteTuningConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteTuningConfig'
type MockQueryHook_DeleteTuningConfig_Call struct {
*mock.Call
}
// MockQueryHook_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run'
type MockQueryHookRunCall struct {
*mock.Call
// DeleteTuningConfig is a helper method to define mock.On call
// - _a0 string
func (_e *MockQueryHook_Expecter) DeleteTuningConfig(_a0 interface{}) *MockQueryHook_DeleteTuningConfig_Call {
return &MockQueryHook_DeleteTuningConfig_Call{Call: _e.mock.On("DeleteTuningConfig", _a0)}
}
func (_c *MockQueryHookRunCall) Run(run func(params map[string]any)) *MockQueryHookRunCall {
func (_c *MockQueryHook_DeleteTuningConfig_Call) Run(run func(_a0 string)) *MockQueryHook_DeleteTuningConfig_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]any))
run(args[0].(string))
})
return _c
}
func (_c *MockQueryHookRunCall) Return(_a0 error) *MockQueryHookRunCall {
func (_c *MockQueryHook_DeleteTuningConfig_Call) Return(_a0 error) *MockQueryHook_DeleteTuningConfig_Call {
_c.Call.Return(_a0)
return _c
}
func (_m *MockQueryHook) Init(param string) error {
ret := _m.Called(param)
func (_c *MockQueryHook_DeleteTuningConfig_Call) RunAndReturn(run func(string) error) *MockQueryHook_DeleteTuningConfig_Call {
_c.Call.Return(run)
return _c
}
// Init provides a mock function with given fields: _a0
func (_m *MockQueryHook) Init(_a0 string) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(string) error); ok {
r0 = rf(param)
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockQueryHook_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Init'
type MockQueryHook_Init_Call struct {
*mock.Call
}
// Init is a helper method to define mock.On call
func (_e *MockQueryHookExpecter) Init(params any) *MockQueryHookRunCall {
return &MockQueryHookRunCall{Call: _e.mock.On("Init", params)}
// - _a0 string
func (_e *MockQueryHook_Expecter) Init(_a0 interface{}) *MockQueryHook_Init_Call {
return &MockQueryHook_Init_Call{Call: _e.mock.On("Init", _a0)}
}
// MockQueryHook_Init_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run'
type MockQueryHookInitCall struct {
func (_c *MockQueryHook_Init_Call) Run(run func(_a0 string)) *MockQueryHook_Init_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
})
return _c
}
func (_c *MockQueryHook_Init_Call) Return(_a0 error) *MockQueryHook_Init_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockQueryHook_Init_Call) RunAndReturn(run func(string) error) *MockQueryHook_Init_Call {
_c.Call.Return(run)
return _c
}
// InitTuningConfig provides a mock function with given fields: _a0
func (_m *MockQueryHook) InitTuningConfig(_a0 map[string]string) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]string) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockQueryHook_InitTuningConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InitTuningConfig'
type MockQueryHook_InitTuningConfig_Call struct {
*mock.Call
}
func (_c *MockQueryHookInitCall) Run(run func(params string)) *MockQueryHookInitCall {
// InitTuningConfig is a helper method to define mock.On call
// - _a0 map[string]string
func (_e *MockQueryHook_Expecter) InitTuningConfig(_a0 interface{}) *MockQueryHook_InitTuningConfig_Call {
return &MockQueryHook_InitTuningConfig_Call{Call: _e.mock.On("InitTuningConfig", _a0)}
}
func (_c *MockQueryHook_InitTuningConfig_Call) Run(run func(_a0 map[string]string)) *MockQueryHook_InitTuningConfig_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(string))
run(args[0].(map[string]string))
})
return _c
}
func (_c *MockQueryHook_InitTuningConfig_Call) Return(_a0 error) *MockQueryHook_InitTuningConfig_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockQueryHook_InitTuningConfig_Call) RunAndReturn(run func(map[string]string) error) *MockQueryHook_InitTuningConfig_Call {
_c.Call.Return(run)
return _c
}
// Run provides a mock function with given fields: _a0
func (_m *MockQueryHook) Run(_a0 map[string]interface{}) error {
ret := _m.Called(_a0)
var r0 error
if rf, ok := ret.Get(0).(func(map[string]interface{}) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockQueryHook_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run'
type MockQueryHook_Run_Call struct {
*mock.Call
}
// Run is a helper method to define mock.On call
// - _a0 map[string]interface{}
func (_e *MockQueryHook_Expecter) Run(_a0 interface{}) *MockQueryHook_Run_Call {
return &MockQueryHook_Run_Call{Call: _e.mock.On("Run", _a0)}
}
func (_c *MockQueryHook_Run_Call) Run(run func(_a0 map[string]interface{})) *MockQueryHook_Run_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(map[string]interface{}))
})
return _c
}
func (_c *MockQueryHookInitCall) Return(_a0 error) *MockQueryHookInitCall {
func (_c *MockQueryHook_Run_Call) Return(_a0 error) *MockQueryHook_Run_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockQueryHook_Run_Call) RunAndReturn(run func(map[string]interface{}) error) *MockQueryHook_Run_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockQueryHook interface {
mock.TestingT
Cleanup(func())
}
// NewMockQueryHook creates a new instance of MockQueryHook. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockQueryHook(t mockConstructorTestingTNewMockQueryHook) *MockQueryHook {
mock := &MockQueryHook{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
......@@ -35,6 +35,7 @@ import (
"path/filepath"
"plugin"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
......@@ -433,6 +434,8 @@ func (node *QueryNode) SetAddress(address string) {
type queryHook interface {
Run(map[string]any) error
Init(string) error
InitTuningConfig(map[string]string) error
DeleteTuningConfig(string) error
}
// initHook initializes parameter tuning hook.
......@@ -461,8 +464,17 @@ func (node *QueryNode) initHook() error {
if err = hoo.Init(paramtable.Get().HookCfg.QueryNodePluginConfig.GetValue()); err != nil {
return fmt.Errorf("fail to init configs for the hook, error: %s", err.Error())
}
if err = hoo.InitTuningConfig(paramtable.Get().HookCfg.QueryNodePluginTuningConfig.GetValue()); err != nil {
return fmt.Errorf("fail to init tuning configs for the hook, error: %s", err.Error())
}
node.queryHook = hoo
node.handleQueryHookEvent()
return nil
}
func (node *QueryNode) handleQueryHookEvent() {
onEvent := func(event *config.Event) {
if node.queryHook != nil {
if err := node.queryHook.Init(event.Value); err != nil {
......@@ -470,7 +482,21 @@ func (node *QueryNode) initHook() error {
}
}
}
onEvent2 := func(event *config.Event) {
if node.queryHook != nil && strings.HasPrefix(event.Key, paramtable.Get().HookCfg.QueryNodePluginTuningConfig.KeyPrefix) {
realKey := strings.TrimPrefix(event.Key, paramtable.Get().HookCfg.QueryNodePluginTuningConfig.KeyPrefix)
if event.EventType == config.CreateType || event.EventType == config.UpdateType {
if err := node.queryHook.InitTuningConfig(map[string]string{realKey: event.Value}); err != nil {
log.Error("failed to refresh hook tuning config", zap.Error(err))
}
} else if event.EventType == config.DeleteType {
if err := node.queryHook.DeleteTuningConfig(realKey); err != nil {
log.Error("failed to delete hook tuning config", zap.Error(err))
}
}
}
}
paramtable.Get().Watch(paramtable.Get().HookCfg.QueryNodePluginConfig.Key, config.NewHandler("queryHook", onEvent))
return nil
paramtable.Get().WatchKeyPrefix(paramtable.Get().HookCfg.QueryNodePluginTuningConfig.KeyPrefix, config.NewHandler("queryHook2", onEvent2))
}
......@@ -19,9 +19,12 @@ package querynodev2
import (
"context"
"os"
"sync/atomic"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/spf13/viper"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -145,6 +148,72 @@ func (suite *QueryNodeSuite) TestInit_VactorChunkManagerFailed() {
suite.Error(err)
}
func (suite *QueryNodeSuite) TestInit_QueryHook() {
// mock expect
suite.factory.EXPECT().Init(mock.Anything).Return()
suite.factory.EXPECT().NewPersistentStorageChunkManager(mock.Anything).Return(suite.chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()))
var err error
suite.node.SetEtcdClient(suite.etcd)
err = suite.node.Init()
suite.NoError(err)
mockHook := &MockQueryHook{}
suite.node.queryHook = mockHook
suite.node.handleQueryHookEvent()
defer func() { suite.node.queryHook = nil }()
yamlWriter := viper.New()
yamlWriter.SetConfigFile("../../configs/milvus.yaml")
yamlWriter.ReadInConfig()
var x1, x2, x3 int32
suite.Equal(atomic.LoadInt32(&x1), int32(0))
suite.Equal(atomic.LoadInt32(&x2), int32(0))
suite.Equal(atomic.LoadInt32(&x3), int32(0))
mockHook.EXPECT().InitTuningConfig(mock.Anything).Run(func(params map[string]string) {
atomic.StoreInt32(&x1, 6)
}).Return(nil)
// create tuning conf
yamlWriter.Set("autoIndex.params.tuning.1238", "xxxx")
yamlWriter.WriteConfig()
suite.Eventually(func() bool {
return atomic.LoadInt32(&x1) == int32(6)
}, 20*time.Second, time.Second)
mockHook.EXPECT().Init(mock.Anything).Run(func(params string) {
atomic.StoreInt32(&x2, 5)
}).Return(nil)
yamlWriter.Set("autoIndex.params.search", "aaaa")
yamlWriter.WriteConfig()
suite.Eventually(func() bool {
return atomic.LoadInt32(&x2) == int32(5)
}, 20*time.Second, time.Second)
yamlWriter.Set("autoIndex.params.search", "")
yamlWriter.WriteConfig()
atomic.StoreInt32(&x1, 0)
suite.Equal(atomic.LoadInt32(&x1), int32(0))
// update tuning conf
yamlWriter.Set("autoIndex.params.tuning.1238", "yyyy")
yamlWriter.WriteConfig()
suite.Eventually(func() bool {
return atomic.LoadInt32(&x1) == int32(6)
}, 20*time.Second, time.Second)
mockHook.EXPECT().DeleteTuningConfig(mock.Anything).Run(func(params string) {
atomic.StoreInt32(&x3, 7)
}).Return(nil)
// delete tuning conf
yamlWriter.Set("autoIndex.params.tuning", "")
yamlWriter.WriteConfig()
suite.Eventually(func() bool {
return atomic.LoadInt32(&x3) == int32(7)
}, 20*time.Second, time.Second)
}
func (suite *QueryNodeSuite) TestStop() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "2")
......
......@@ -87,6 +87,7 @@ const (
SearchParamKey = "search_param"
SegmentNumKey = "segment_num"
WithFilterKey = "with_filter"
CollectionKey = "collection"
IndexParamsKey = "params"
IndexTypeKey = "index_type"
......
......@@ -15,13 +15,19 @@
// limitations under the License.
package config
import (
"strings"
)
type EventDispatcher struct {
registry map[string][]EventHandler
registry map[string][]EventHandler
keyPrefix []string
}
func NewEventDispatcher() *EventDispatcher {
return &EventDispatcher{
registry: make(map[string][]EventHandler),
registry: make(map[string][]EventHandler),
keyPrefix: make([]string, 0),
}
}
......@@ -30,9 +36,17 @@ func (ed *EventDispatcher) Get(key string) []EventHandler {
}
func (ed *EventDispatcher) Dispatch(event *Event) {
hs, ok := ed.registry[formatKey(event.Key)]
var hs []EventHandler
realKey := formatKey(event.Key)
hs, ok := ed.registry[realKey]
if !ok {
return
for _, v := range ed.keyPrefix {
if strings.HasPrefix(realKey, v) {
if _, exist := ed.registry[v]; exist {
hs = append(hs, ed.registry[v]...)
}
}
}
}
for _, h := range hs {
h.OnEvent(event)
......@@ -50,6 +64,18 @@ func (ed *EventDispatcher) Register(key string, handler EventHandler) {
}
}
// register a handler to watch specific config changed
func (ed *EventDispatcher) RegisterForKeyPrefix(keyPrefix string, handler EventHandler) {
keyPrefix = formatKey(keyPrefix)
v, ok := ed.registry[keyPrefix]
if ok {
ed.registry[keyPrefix] = append(v, handler)
} else {
ed.registry[keyPrefix] = []EventHandler{handler}
}
ed.keyPrefix = append(ed.keyPrefix, keyPrefix)
}
func (ed *EventDispatcher) Unregister(key string, handler EventHandler) {
key = formatKey(key)
v, ok := ed.registry[key]
......
......@@ -17,7 +17,10 @@
package config
import (
"os"
"path"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
......@@ -33,6 +36,25 @@ func TestAllConfigFromManager(t *testing.T) {
assert.Less(t, 0, len(all))
}
func TestConfigChangeEvent(t *testing.T) {
dir, _ := os.MkdirTemp("", "milvus")
os.WriteFile(path.Join(dir, "milvus.yaml"), []byte("a.b: 1\nc.d: 2"), 0600)
os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 3"), 0600)
fs := NewFileSource(&FileInfo{[]string{path.Join(dir, "milvus.yaml"), path.Join(dir, "user.yaml")}, 1})
mgr, _ := Init()
err := mgr.AddSource(fs)
assert.NoError(t, err)
res, err := mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, res, "3")
os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 6"), 0600)
time.Sleep(3 * time.Second)
res, err = mgr.GetConfig("a.b")
assert.NoError(t, err)
assert.Equal(t, res, "6")
}
func TestAllDupliateSource(t *testing.T) {
mgr, _ := Init()
err := mgr.AddSource(NewEnvSource(formatKey))
......
......@@ -151,6 +151,10 @@ func (p *ComponentParam) Watch(key string, watcher config.EventHandler) {
p.mgr.Dispatcher.Register(key, watcher)
}
func (p *ComponentParam) WatchKeyPrefix(keyPrefix string, watcher config.EventHandler) {
p.mgr.Dispatcher.RegisterForKeyPrefix(keyPrefix, watcher)
}
// /////////////////////////////////////////////////////////////////////////////
// --- common ---
type commonConfig struct {
......
......@@ -9,9 +9,10 @@ import (
const hookYamlFile = "hook.yaml"
type hookConfig struct {
SoPath ParamItem `refreshable:"false"`
SoConfig ParamGroup `refreshable:"false"`
QueryNodePluginConfig ParamItem `refreshable:"true"`
SoPath ParamItem `refreshable:"false"`
SoConfig ParamGroup `refreshable:"false"`
QueryNodePluginConfig ParamItem `refreshable:"true"`
QueryNodePluginTuningConfig ParamGroup `refreshable:"true"`
}
func (h *hookConfig) init(base *BaseTable) {
......@@ -38,4 +39,10 @@ func (h *hookConfig) init(base *BaseTable) {
Version: "2.3.0",
}
h.QueryNodePluginConfig.Init(base.mgr)
h.QueryNodePluginTuningConfig = ParamGroup{
KeyPrefix: "autoindex.params.tuning.",
Version: "2.3.0",
}
h.QueryNodePluginTuningConfig.Init(base.mgr)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册