diff --git a/internal/master/client.go b/internal/master/client.go index 34d7e73731be39dd80f9dd0b913144f84d927425..7c644e02acddf72292636baac9d011272bc19574 100644 --- a/internal/master/client.go +++ b/internal/master/client.go @@ -90,12 +90,12 @@ func (m *MockBuildIndexClient) GetIndexFilePaths(indexID UniqueID) ([]string, er } type LoadIndexClient interface { - LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error + LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error } type MockLoadIndexClient struct { } -func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string) error { +func (m *MockLoadIndexClient) LoadIndex(indexPaths []string, segmentID int64, fieldID int64, fieldName string, indexParams map[string]string) error { return nil } diff --git a/internal/master/index_builder_scheduler.go b/internal/master/index_builder_scheduler.go index 857ffd46b00947a325985e925a3d68f8827ae63a..a67d92c8aa545136022338e68c724a876d51c1af 100644 --- a/internal/master/index_builder_scheduler.go +++ b/internal/master/index_builder_scheduler.go @@ -133,6 +133,7 @@ func (scheduler *IndexBuildScheduler) describe() error { fieldID: indexBuildInfo.fieldID, fieldName: fieldName, indexFilePaths: filePaths, + indexParams: channelInfo.indexParams, } // Save data to meta table err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{ diff --git a/internal/master/index_load_scheduler.go b/internal/master/index_load_scheduler.go index 3d23cfcc8e41d3ff0113f66ddedc4feafbb0d21f..eec832b17f4aa0798793c26602863447e7a3e138 100644 --- a/internal/master/index_load_scheduler.go +++ b/internal/master/index_load_scheduler.go @@ -3,12 +3,15 @@ package master import ( "context" "log" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) type IndexLoadInfo struct { segmentID UniqueID fieldID UniqueID fieldName string + indexParams []*commonpb.KeyValuePair indexFilePaths []string } @@ -36,7 +39,11 @@ func NewIndexLoadScheduler(ctx context.Context, client LoadIndexClient, metaTabl func (scheduler *IndexLoadScheduler) schedule(info interface{}) error { indexLoadInfo := info.(*IndexLoadInfo) - err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName) + indexParams := make(map[string]string) + for _, kv := range indexLoadInfo.indexParams { + indexParams[kv.Key] = kv.Value + } + err := scheduler.client.LoadIndex(indexLoadInfo.indexFilePaths, indexLoadInfo.segmentID, indexLoadInfo.fieldID, indexLoadInfo.fieldName, indexParams) //TODO: Save data to meta table if err != nil { return err diff --git a/internal/master/index_task.go b/internal/master/index_task.go index c5cdcc49429d090ab7c7fbe8c5fc087d78c26b90..580d8f73a6ce26516e2a87bfcda08d0a532c2e76 100644 --- a/internal/master/index_task.go +++ b/internal/master/index_task.go @@ -68,6 +68,7 @@ func (task *createIndexTask) Execute() error { fieldID: fieldID, fieldName: task.req.FieldName, indexFilePaths: indexMeta.IndexFilePaths, + indexParams: indexMeta.IndexParams, }) if err != nil { return err diff --git a/internal/master/master.go b/internal/master/master.go index d73d8f29070ab8a5df9a0619e94a533266e30a45..90093cc22ecfbacc36dd89baf82aaa3dab55b285 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -10,6 +10,12 @@ import ( "sync/atomic" "time" + "github.com/zilliztech/milvus-distributed/internal/querynode/client" + + indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client" + + writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client" + etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" @@ -175,9 +181,15 @@ func CreateServer(ctx context.Context) (*Master, error) { m.scheduler.SetDDMsgStream(pulsarDDStream) m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() }) - flushClient := &MockWriteNodeClient{} - buildIndexClient := &MockBuildIndexClient{} - loadIndexClient := &MockLoadIndexClient{} + flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, kvRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream) + if err != nil { + return nil, err + } + buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress) + if err != nil { + return nil, err + } + loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames) m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable) m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch) diff --git a/internal/master/param_table.go b/internal/master/param_table.go index 89f3a6345e36eb767459ec3c2b330091ac1485b5..fdd2f933f9436fc47df019d414ff86112dfad617 100644 --- a/internal/master/param_table.go +++ b/internal/master/param_table.go @@ -21,6 +21,7 @@ type ParamTable struct { KvRootPath string WriteNodeSegKvSubPath string PulsarAddress string + IndexBuilderAddress string // nodeID ProxyIDList []typeutil.UniqueID @@ -49,6 +50,8 @@ type ParamTable struct { MaxPartitionNum int64 DefaultPartitionTag string + + LoadIndexChannelNames []string } var Params ParamTable @@ -71,6 +74,7 @@ func (p *ParamTable) Init() { p.initKvRootPath() p.initWriteNodeSegKvSubPath() p.initPulsarAddress() + p.initIndexBuilderAddress() p.initProxyIDList() p.initWriteNodeIDList() @@ -95,6 +99,8 @@ func (p *ParamTable) Init() { p.initMsgChannelSubName() p.initMaxPartitionNum() p.initDefaultPartitionTag() + + p.initLoadIndexChannelNames() } func (p *ParamTable) initAddress() { @@ -125,6 +131,14 @@ func (p *ParamTable) initPulsarAddress() { p.PulsarAddress = addr } +func (p *ParamTable) initIndexBuilderAddress() { + ret, err := p.Load("_IndexBuilderAddress") + if err != nil { + panic(err) + } + p.IndexBuilderAddress = ret +} + func (p *ParamTable) initMetaRootPath() { rootPath, err := p.Load("etcd.rootPath") if err != nil { @@ -346,3 +360,11 @@ func (p *ParamTable) initDefaultPartitionTag() { p.DefaultPartitionTag = defaultTag } + +func (p *ParamTable) initLoadIndexChannelNames() { + loadIndexChannelName, err := p.Load("msgChannel.chanNamePrefix.cmd") + if err != nil { + panic(err) + } + p.LoadIndexChannelNames = []string{loadIndexChannelName} +} diff --git a/internal/master/param_table_test.go b/internal/master/param_table_test.go index 8128c5f180c0eebb787a9a0e60517ee949fe6fe5..750021666ca4b9d85455d545998de8aa65488f93 100644 --- a/internal/master/param_table_test.go +++ b/internal/master/param_table_test.go @@ -31,6 +31,11 @@ func TestParamTable_KVRootPath(t *testing.T) { assert.Equal(t, path, "by-dev/kv") } +func TestParamTable_IndexBuilderAddress(t *testing.T) { + path := Params.IndexBuilderAddress + assert.Equal(t, path, "localhost:31000") +} + func TestParamTable_TopicNum(t *testing.T) { num := Params.TopicNum fmt.Println("TopicNum:", num) diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go index ef8d7f56f02a4eaa751f7d498397227fbce96c9a..229b47765b42b4dc6825fac39acfdca4f292bf2d 100644 --- a/internal/util/paramtable/paramtable.go +++ b/internal/util/paramtable/paramtable.go @@ -57,19 +57,40 @@ func (gp *BaseTable) Init() { if err != nil { panic(err) } + gp.tryloadFromEnv() + +} + +func (gp *BaseTable) tryloadFromEnv() { minioAddress := os.Getenv("MINIO_ADDRESS") if minioAddress == "" { - minioAddress = "localhost:9000" + minioHost, err := gp.Load("minio.address") + if err != nil { + panic(err) + } + port, err := gp.Load("minio.port") + if err != nil { + panic(err) + } + minioAddress = minioHost + ":" + port } - err = gp.Save("_MinioAddress", minioAddress) + err := gp.Save("_MinioAddress", minioAddress) if err != nil { panic(err) } etcdAddress := os.Getenv("ETCD_ADDRESS") if etcdAddress == "" { - etcdAddress = "localhost:2379" + etcdHost, err := gp.Load("etcd.address") + if err != nil { + panic(err) + } + port, err := gp.Load("etcd.port") + if err != nil { + panic(err) + } + etcdAddress = etcdHost + ":" + port } err = gp.Save("_EtcdAddress", etcdAddress) if err != nil { @@ -78,7 +99,15 @@ func (gp *BaseTable) Init() { pulsarAddress := os.Getenv("PULSAR_ADDRESS") if pulsarAddress == "" { - pulsarAddress = "pulsar://localhost:6650" + pulsarHost, err := gp.Load("pulsar.address") + if err != nil { + panic(err) + } + port, err := gp.Load("pulsar.port") + if err != nil { + panic(err) + } + pulsarAddress = "pulsar://" + pulsarHost + ":" + port } err = gp.Save("_PulsarAddress", pulsarAddress) if err != nil { @@ -87,12 +116,37 @@ func (gp *BaseTable) Init() { masterAddress := os.Getenv("MASTER_ADDRESS") if masterAddress == "" { - masterAddress = "localhost:53100" + masterHost, err := gp.Load("master.address") + if err != nil { + panic(err) + } + port, err := gp.Load("master.port") + if err != nil { + panic(err) + } + masterAddress = masterHost + ":" + port } err = gp.Save("_MasterAddress", masterAddress) if err != nil { panic(err) } + + indexBuilderAddress := os.Getenv("INDEX_BUILDER_ADDRESS") + if indexBuilderAddress == "" { + indexBuilderHost, err := gp.Load("indexBuilder.address") + if err != nil { + panic(err) + } + port, err := gp.Load("indexBuilder.port") + if err != nil { + panic(err) + } + indexBuilderAddress = indexBuilderHost + ":" + port + } + err = gp.Save("_IndexBuilderAddress", indexBuilderAddress) + if err != nil { + panic(err) + } } func (gp *BaseTable) Load(key string) (string, error) {