minicluster.go 32.1 KB
Newer Older
W
wayblink 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
	"context"
	"fmt"
	"math/rand"
C
congqixia 已提交
23
	"path"
W
wayblink 已提交
24 25 26
	"sync"
	"time"

27
	"github.com/cockroachdb/errors"
28 29
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
30

W
wayblink 已提交
31 32 33 34 35
	"github.com/milvus-io/milvus/internal/datacoord"
	"github.com/milvus-io/milvus/internal/datanode"
	"github.com/milvus-io/milvus/internal/indexnode"
	proxy2 "github.com/milvus-io/milvus/internal/proxy"
	querycoord "github.com/milvus-io/milvus/internal/querycoordv2"
C
congqixia 已提交
36
	"github.com/milvus-io/milvus/internal/querynodev2"
W
wayblink 已提交
37
	"github.com/milvus-io/milvus/internal/rootcoord"
38
	"github.com/milvus-io/milvus/internal/storage"
W
wayblink 已提交
39 40
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/dependency"
E
Enwei Jiao 已提交
41
	"github.com/milvus-io/milvus/pkg/config"
42 43 44 45
	"github.com/milvus-io/milvus/pkg/log"
	"github.com/milvus-io/milvus/pkg/util/etcd"
	"github.com/milvus-io/milvus/pkg/util/funcutil"
	"github.com/milvus-io/milvus/pkg/util/paramtable"
W
wayblink 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
)

type Cluster interface {
	Start() error
	Stop() error

	// node add/remove interfaces
	AddRootCoord(types.RootCoordComponent) error
	AddDataCoord(types.DataCoordComponent) error
	AddQueryCoord(types.QueryCoordComponent) error
	//AddIndexCoord(types.IndexCoordComponent) error
	AddDataNode(types.DataNodeComponent) error
	AddQueryNode(types.QueryNodeComponent) error
	AddIndexNode(types.IndexNodeComponent) error

	RemoveRootCoord(types.RootCoordComponent) error
	RemoveDataCoord(types.DataCoordComponent) error
	RemoveQueryCoord(types.QueryCoordComponent) error
	//RemoveIndexCoord(types.IndexCoordComponent) error
	RemoveDataNode(types.DataNodeComponent) error
	RemoveQueryNode(types.QueryNodeComponent) error
	RemoveIndexNode(types.IndexNodeComponent) error

	// UpdateClusterSize change the cluster size, will add/remove nodes to reach given config
	UpdateClusterSize(ClusterConfig) error

	// GetMetaWatcher to observe meta data
	GetMetaWatcher() MetaWatcher
	// todo
	// GetStorageWatcher() StorageWatcher
}

type ClusterConfig struct {
	//ProxyNum int
	// todo coord num can be more than 1 if enable Active-Standby
	//RootCoordNum int
	//DataCoordNum int
	//IndexCoordNum int
	//QueryCoordNum int
	QueryNodeNum int
	DataNodeNum  int
	IndexNodeNum int
}

const (
	EtcdRootPath  = "etcd.rootPath"
	MinioRootPath = "minio.rootPath"
)

type MiniCluster struct {
	ctx context.Context

98
	mu sync.RWMutex
W
wayblink 已提交
99 100 101 102

	params        map[string]string
	clusterConfig ClusterConfig

103
	factory      dependency.Factory
104
	ChunkManager storage.ChunkManager
W
wayblink 已提交
105

106
	EtcdCli *clientv3.Client
W
wayblink 已提交
107

108 109 110 111
	Proxy      types.ProxyComponent
	DataCoord  types.DataCoordComponent
	RootCoord  types.RootCoordComponent
	QueryCoord types.QueryCoordComponent
W
wayblink 已提交
112

113 114 115
	QueryNodes []types.QueryNodeComponent
	DataNodes  []types.DataNodeComponent
	IndexNodes []types.IndexNodeComponent
W
wayblink 已提交
116

117
	MetaWatcher MetaWatcher
W
wayblink 已提交
118 119
}

E
Enwei Jiao 已提交
120
var params *paramtable.ComponentParam = paramtable.Get()
W
wayblink 已提交
121 122 123 124 125 126 127

type Option func(cluster *MiniCluster)

func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster, err error) {
	cluster = &MiniCluster{
		ctx: ctx,
	}
E
Enwei Jiao 已提交
128
	params.Init()
W
wayblink 已提交
129 130 131 132 133 134
	cluster.params = DefaultParams()
	cluster.clusterConfig = DefaultClusterConfig()
	for _, opt := range opts {
		opt(cluster)
	}
	for k, v := range cluster.params {
E
Enwei Jiao 已提交
135
		params.Save(k, v)
W
wayblink 已提交
136
	}
C
congqixia 已提交
137
	params.UpdateSourceOptions(config.WithEtcdSource(&config.EtcdInfo{
E
Enwei Jiao 已提交
138 139 140
		KeyPrefix:       cluster.params[EtcdRootPath],
		RefreshInterval: 2 * time.Second,
	}))
W
wayblink 已提交
141 142

	if cluster.factory == nil {
143 144 145
		params.Save(params.LocalStorageCfg.Path.Key, "/tmp/milvus/")
		params.Save(params.CommonCfg.StorageType.Key, "local")
		params.Save(params.MinioCfg.RootPath.Key, "/tmp/milvus/")
W
wayblink 已提交
146
		cluster.factory = dependency.NewDefaultFactory(true)
147 148 149 150
		chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
		if err != nil {
			return nil, err
		}
151
		cluster.ChunkManager = chunkManager
W
wayblink 已提交
152 153
	}

154
	if cluster.EtcdCli == nil {
W
wayblink 已提交
155 156
		var etcdCli *clientv3.Client
		etcdCli, err = etcd.GetEtcdClient(
E
Enwei Jiao 已提交
157 158 159 160 161 162 163
			params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
			params.EtcdCfg.EtcdUseSSL.GetAsBool(),
			params.EtcdCfg.Endpoints.GetAsStrings(),
			params.EtcdCfg.EtcdTLSCert.GetValue(),
			params.EtcdCfg.EtcdTLSKey.GetValue(),
			params.EtcdCfg.EtcdTLSCACert.GetValue(),
			params.EtcdCfg.EtcdTLSMinVersion.GetValue())
W
wayblink 已提交
164 165 166
		if err != nil {
			return nil, err
		}
167
		cluster.EtcdCli = etcdCli
W
wayblink 已提交
168 169
	}

170
	cluster.MetaWatcher = &EtcdMetaWatcher{
W
wayblink 已提交
171
		rootPath: cluster.params[EtcdRootPath],
172
		etcdCli:  cluster.EtcdCli,
W
wayblink 已提交
173 174
	}

175
	if cluster.RootCoord == nil {
W
wayblink 已提交
176 177 178 179 180
		var rootCoord types.RootCoordComponent
		rootCoord, err = cluster.CreateDefaultRootCoord()
		if err != nil {
			return nil, err
		}
181
		cluster.RootCoord = rootCoord
W
wayblink 已提交
182 183
	}

184
	if cluster.DataCoord == nil {
W
wayblink 已提交
185 186 187 188 189
		var dataCoord types.DataCoordComponent
		dataCoord, err = cluster.CreateDefaultDataCoord()
		if err != nil {
			return nil, err
		}
190
		cluster.DataCoord = dataCoord
W
wayblink 已提交
191 192
	}

193
	if cluster.QueryCoord == nil {
W
wayblink 已提交
194 195 196 197 198
		var queryCoord types.QueryCoordComponent
		queryCoord, err = cluster.CreateDefaultQueryCoord()
		if err != nil {
			return nil, err
		}
199
		cluster.QueryCoord = queryCoord
W
wayblink 已提交
200 201 202 203 204 205 206 207 208 209 210
	}

	//if cluster.indexCoord == nil {
	//	var indexCoord types.IndexCoordComponent
	//	indexCoord, err = cluster.CreateDefaultIndexCoord()
	//	if err != nil {
	//		return nil, err
	//	}
	//	cluster.indexCoord = indexCoord
	//}

211
	if cluster.DataNodes == nil {
W
wayblink 已提交
212 213 214 215 216 217 218 219 220
		dataNodes := make([]types.DataNodeComponent, 0)
		for i := 0; i < cluster.clusterConfig.DataNodeNum; i++ {
			var dataNode types.DataNodeComponent
			dataNode, err = cluster.CreateDefaultDataNode()
			if err != nil {
				return nil, err
			}
			dataNodes = append(dataNodes, dataNode)
		}
221
		cluster.DataNodes = dataNodes
W
wayblink 已提交
222 223
	}

224
	if cluster.QueryNodes == nil {
W
wayblink 已提交
225 226 227 228 229 230 231 232 233
		queryNodes := make([]types.QueryNodeComponent, 0)
		for i := 0; i < cluster.clusterConfig.QueryNodeNum; i++ {
			var queryNode types.QueryNodeComponent
			queryNode, err = cluster.CreateDefaultQueryNode()
			if err != nil {
				return nil, err
			}
			queryNodes = append(queryNodes, queryNode)
		}
234
		cluster.QueryNodes = queryNodes
W
wayblink 已提交
235 236
	}

237
	if cluster.IndexNodes == nil {
W
wayblink 已提交
238 239 240 241 242 243 244 245 246
		indexNodes := make([]types.IndexNodeComponent, 0)
		for i := 0; i < cluster.clusterConfig.IndexNodeNum; i++ {
			var indexNode types.IndexNodeComponent
			indexNode, err = cluster.CreateDefaultIndexNode()
			if err != nil {
				return
			}
			indexNodes = append(indexNodes, indexNode)
		}
247
		cluster.IndexNodes = indexNodes
W
wayblink 已提交
248 249
	}

250
	if cluster.Proxy == nil {
W
wayblink 已提交
251 252 253 254 255
		var proxy types.ProxyComponent
		proxy, err = cluster.CreateDefaultProxy()
		if err != nil {
			return
		}
256
		cluster.Proxy = proxy
W
wayblink 已提交
257 258 259
	}

	//cluster.dataCoord.SetIndexCoord(cluster.indexCoord)
260
	cluster.DataCoord.SetRootCoord(cluster.RootCoord)
W
wayblink 已提交
261

262
	err = cluster.RootCoord.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
263 264 265 266 267 268 269
	if err != nil {
		return
	}
	//err = cluster.rootCoord.SetIndexCoord(cluster.indexCoord)
	//if err != nil {
	//	return
	//}
270
	err = cluster.RootCoord.SetQueryCoord(cluster.QueryCoord)
W
wayblink 已提交
271 272 273 274 275 276 277 278
	if err != nil {
		return
	}

	//err = cluster.queryCoord.SetIndexCoord(cluster.indexCoord)
	if err != nil {
		return
	}
279
	err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
280 281 282
	if err != nil {
		return
	}
283
	err = cluster.QueryCoord.SetRootCoord(cluster.RootCoord)
W
wayblink 已提交
284 285 286 287 288 289 290 291 292 293 294 295 296
	if err != nil {
		return
	}

	//err = cluster.indexCoord.SetDataCoord(cluster.dataCoord)
	//if err != nil {
	//	return
	//}
	//err = cluster.indexCoord.SetRootCoord(cluster.rootCoord)
	//if err != nil {
	//	return
	//}

297 298
	for _, dataNode := range cluster.DataNodes {
		err = dataNode.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
299 300 301
		if err != nil {
			return
		}
302
		err = dataNode.SetRootCoord(cluster.RootCoord)
W
wayblink 已提交
303 304 305 306 307
		if err != nil {
			return
		}
	}

308
	cluster.Proxy.SetDataCoordClient(cluster.DataCoord)
W
wayblink 已提交
309
	//cluster.proxy.SetIndexCoordClient(cluster.indexCoord)
310 311
	cluster.Proxy.SetQueryCoordClient(cluster.QueryCoord)
	cluster.Proxy.SetRootCoordClient(cluster.RootCoord)
W
wayblink 已提交
312 313 314 315

	return cluster, nil
}

316 317 318 319
func (cluster *MiniCluster) GetContext() context.Context {
	return cluster.ctx
}

W
wayblink 已提交
320 321
func (cluster *MiniCluster) Start() error {
	log.Info("mini cluster start")
322
	err := cluster.RootCoord.Init()
W
wayblink 已提交
323 324 325
	if err != nil {
		return err
	}
326
	err = cluster.RootCoord.Start()
W
wayblink 已提交
327 328 329
	if err != nil {
		return err
	}
330
	err = cluster.RootCoord.Register()
W
wayblink 已提交
331 332 333 334
	if err != nil {
		return err
	}

335
	err = cluster.DataCoord.Init()
W
wayblink 已提交
336 337 338
	if err != nil {
		return err
	}
339
	err = cluster.DataCoord.Start()
W
wayblink 已提交
340 341 342
	if err != nil {
		return err
	}
343
	err = cluster.DataCoord.Register()
W
wayblink 已提交
344 345 346 347
	if err != nil {
		return err
	}

348
	err = cluster.QueryCoord.Init()
W
wayblink 已提交
349 350 351
	if err != nil {
		return err
	}
352
	err = cluster.QueryCoord.Start()
W
wayblink 已提交
353 354 355
	if err != nil {
		return err
	}
356
	err = cluster.QueryCoord.Register()
W
wayblink 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
	if err != nil {
		return err
	}

	//err = cluster.indexCoord.Init()
	//if err != nil {
	//	return err
	//}
	//err = cluster.indexCoord.Start()
	//if err != nil {
	//	return err
	//}
	//err = cluster.indexCoord.Register()
	//if err != nil {
	//	return err
	//}

374
	for _, dataNode := range cluster.DataNodes {
W
wayblink 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388
		err = dataNode.Init()
		if err != nil {
			return err
		}
		err = dataNode.Start()
		if err != nil {
			return err
		}
		err = dataNode.Register()
		if err != nil {
			return err
		}
	}

389
	for _, queryNode := range cluster.QueryNodes {
W
wayblink 已提交
390 391 392 393 394 395 396 397 398 399 400 401 402 403
		err = queryNode.Init()
		if err != nil {
			return err
		}
		err = queryNode.Start()
		if err != nil {
			return err
		}
		err = queryNode.Register()
		if err != nil {
			return err
		}
	}

404
	for _, indexNode := range cluster.IndexNodes {
W
wayblink 已提交
405 406 407 408 409 410 411 412 413 414 415 416 417 418
		err = indexNode.Init()
		if err != nil {
			return err
		}
		err = indexNode.Start()
		if err != nil {
			return err
		}
		err = indexNode.Register()
		if err != nil {
			return err
		}
	}

419
	err = cluster.Proxy.Init()
W
wayblink 已提交
420 421 422
	if err != nil {
		return err
	}
423
	err = cluster.Proxy.Start()
W
wayblink 已提交
424 425 426
	if err != nil {
		return err
	}
427
	err = cluster.Proxy.Register()
W
wayblink 已提交
428 429 430 431 432 433 434 435 436
	if err != nil {
		return err
	}

	return nil
}

func (cluster *MiniCluster) Stop() error {
	log.Info("mini cluster stop")
437
	cluster.RootCoord.Stop()
438
	log.Info("mini cluster rootCoord stopped")
439
	cluster.DataCoord.Stop()
440
	log.Info("mini cluster dataCoord stopped")
W
wayblink 已提交
441
	//cluster.indexCoord.Stop()
442
	cluster.QueryCoord.Stop()
443
	log.Info("mini cluster queryCoord stopped")
444
	cluster.Proxy.Stop()
445
	log.Info("mini cluster proxy stopped")
W
wayblink 已提交
446

447
	for _, dataNode := range cluster.DataNodes {
W
wayblink 已提交
448 449
		dataNode.Stop()
	}
450
	log.Info("mini cluster datanodes stopped")
451

452
	for _, queryNode := range cluster.QueryNodes {
W
wayblink 已提交
453 454
		queryNode.Stop()
	}
455 456
	log.Info("mini cluster querynodes stopped")

457
	for _, indexNode := range cluster.IndexNodes {
W
wayblink 已提交
458 459
		indexNode.Stop()
	}
460
	log.Info("mini cluster indexnodes stopped")
W
wayblink 已提交
461

462 463
	cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
	defer cluster.EtcdCli.Close()
464

465
	if cluster.ChunkManager == nil {
466 467 468 469
		chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
		if err != nil {
			log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
		} else {
470
			cluster.ChunkManager = chunkManager
471
		}
W
wayblink 已提交
472
	}
473
	cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
W
wayblink 已提交
474 475 476 477 478 479 480 481 482
	return nil
}

func DefaultParams() map[string]string {
	testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix())
	return map[string]string{
		EtcdRootPath:  testPath,
		MinioRootPath: testPath,
		//"runtime.role": typeutil.StandaloneRole,
E
Enwei Jiao 已提交
483
		params.IntegrationTestCfg.IntegrationMode.Key: "true",
C
congqixia 已提交
484
		params.LocalStorageCfg.Path.Key:               path.Join("/tmp", testPath),
E
Enwei Jiao 已提交
485 486
		params.CommonCfg.StorageType.Key:              "local",
		params.DataNodeCfg.MemoryForceSyncEnable.Key:  "false", // local execution will print too many logs
487
		params.CommonCfg.GracefulStopTimeout.Key:      "10",
W
wayblink 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
	}
}

func WithParam(k, v string) Option {
	return func(cluster *MiniCluster) {
		cluster.params[k] = v
	}
}

func DefaultClusterConfig() ClusterConfig {
	return ClusterConfig{
		QueryNodeNum: 1,
		DataNodeNum:  1,
		IndexNodeNum: 1,
	}
}

func WithClusterSize(clusterConfig ClusterConfig) Option {
	return func(cluster *MiniCluster) {
		cluster.clusterConfig = clusterConfig
	}
}

func WithEtcdClient(etcdCli *clientv3.Client) Option {
	return func(cluster *MiniCluster) {
513
		cluster.EtcdCli = etcdCli
W
wayblink 已提交
514 515 516 517 518 519 520 521 522 523 524
	}
}

func WithFactory(factory dependency.Factory) Option {
	return func(cluster *MiniCluster) {
		cluster.factory = factory
	}
}

func WithRootCoord(rootCoord types.RootCoordComponent) Option {
	return func(cluster *MiniCluster) {
525
		cluster.RootCoord = rootCoord
W
wayblink 已提交
526 527 528 529 530
	}
}

func WithDataCoord(dataCoord types.DataCoordComponent) Option {
	return func(cluster *MiniCluster) {
531
		cluster.DataCoord = dataCoord
W
wayblink 已提交
532 533 534 535 536
	}
}

func WithQueryCoord(queryCoord types.QueryCoordComponent) Option {
	return func(cluster *MiniCluster) {
537
		cluster.QueryCoord = queryCoord
W
wayblink 已提交
538 539 540 541 542 543 544 545 546 547 548
	}
}

//func WithIndexCoord(indexCoord types.IndexCoordComponent) Option {
//	return func(cluster *MiniCluster) {
//		cluster.indexCoord = indexCoord
//	}
//}

func WithDataNodes(datanodes []types.DataNodeComponent) Option {
	return func(cluster *MiniCluster) {
549
		cluster.DataNodes = datanodes
W
wayblink 已提交
550 551 552 553 554
	}
}

func WithQueryNodes(queryNodes []types.QueryNodeComponent) Option {
	return func(cluster *MiniCluster) {
555
		cluster.QueryNodes = queryNodes
W
wayblink 已提交
556 557 558 559 560
	}
}

func WithIndexNodes(indexNodes []types.IndexNodeComponent) Option {
	return func(cluster *MiniCluster) {
561
		cluster.IndexNodes = indexNodes
W
wayblink 已提交
562 563 564 565 566
	}
}

func WithProxy(proxy types.ProxyComponent) Option {
	return func(cluster *MiniCluster) {
567
		cluster.Proxy = proxy
W
wayblink 已提交
568 569 570 571 572 573 574 575 576 577 578
	}
}

func (cluster *MiniCluster) CreateDefaultRootCoord() (types.RootCoordComponent, error) {
	rootCoord, err := rootcoord.NewCore(cluster.ctx, cluster.factory)
	if err != nil {
		return nil, err
	}
	port := funcutil.GetAvailablePort()
	rootCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	rootCoord.SetProxyCreator(cluster.GetProxy)
579
	rootCoord.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
580 581 582 583 584 585 586 587 588
	return rootCoord, nil
}

func (cluster *MiniCluster) CreateDefaultDataCoord() (types.DataCoordComponent, error) {
	dataCoord := datacoord.CreateServer(cluster.ctx, cluster.factory)
	port := funcutil.GetAvailablePort()
	dataCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	dataCoord.SetDataNodeCreator(cluster.GetDataNode)
	dataCoord.SetIndexNodeCreator(cluster.GetIndexNode)
589
	dataCoord.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
590 591 592 593
	return dataCoord, nil
}

func (cluster *MiniCluster) CreateDefaultQueryCoord() (types.QueryCoordComponent, error) {
594
	queryCoord, err := querycoord.NewQueryCoord(cluster.ctx)
W
wayblink 已提交
595 596 597 598 599 600
	if err != nil {
		return nil, err
	}
	port := funcutil.GetAvailablePort()
	queryCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	queryCoord.SetQueryNodeCreator(cluster.GetQueryNode)
601
	queryCoord.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
	return queryCoord, nil
}

//func (cluster *MiniCluster) CreateDefaultIndexCoord() (types.IndexCoordComponent, error) {
//	indexCoord, err := indexcoord.NewIndexCoord(cluster.ctx, cluster.factory)
//	if err != nil {
//		return nil, err
//	}
//	port := funcutil.GetAvailablePort()
//	indexCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
//	indexCoord.SetIndexNodeCreator(cluster.GetIndexNode)
//	indexCoord.SetEtcdClient(cluster.etcdCli)
//	return indexCoord, nil
//}

func (cluster *MiniCluster) CreateDefaultDataNode() (types.DataNodeComponent, error) {
	log.Debug("mini cluster CreateDefaultDataNode")
	dataNode := datanode.NewDataNode(cluster.ctx, cluster.factory)
620
	dataNode.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
621 622 623 624 625 626 627
	port := funcutil.GetAvailablePort()
	dataNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	return dataNode, nil
}

func (cluster *MiniCluster) CreateDefaultQueryNode() (types.QueryNodeComponent, error) {
	log.Debug("mini cluster CreateDefaultQueryNode")
C
congqixia 已提交
628
	queryNode := querynodev2.NewQueryNode(cluster.ctx, cluster.factory)
629
	queryNode.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
630 631 632 633 634 635 636 637
	port := funcutil.GetAvailablePort()
	queryNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	return queryNode, nil
}

func (cluster *MiniCluster) CreateDefaultIndexNode() (types.IndexNodeComponent, error) {
	log.Debug("mini cluster CreateDefaultIndexNode")
	indexNode := indexnode.NewIndexNode(cluster.ctx, cluster.factory)
638
	indexNode.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
639 640 641 642 643 644 645 646
	port := funcutil.GetAvailablePort()
	indexNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	return indexNode, nil
}

func (cluster *MiniCluster) CreateDefaultProxy() (types.ProxyComponent, error) {
	log.Debug("mini cluster CreateDefaultProxy")
	proxy, err := proxy2.NewProxy(cluster.ctx, cluster.factory)
647
	proxy.SetEtcdClient(cluster.EtcdCli)
W
wayblink 已提交
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
	if err != nil {
		return nil, err
	}
	port := funcutil.GetAvailablePort()
	proxy.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
	proxy.SetQueryNodeCreator(cluster.GetQueryNode)
	return proxy, nil
}

// AddRootCoord to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) error {
	log.Debug("mini cluster AddRootCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
664
	if cluster.RootCoord != nil {
W
wayblink 已提交
665 666 667 668 669 670 671 672 673 674
		return errors.New("rootCoord already exist, maybe you need to remove it first")
	}
	if rootCoord == nil {
		rootCoord, err = cluster.CreateDefaultRootCoord()
		if err != nil {
			return err
		}
	}

	// link
675 676
	rootCoord.SetDataCoord(cluster.DataCoord)
	rootCoord.SetQueryCoord(cluster.QueryCoord)
W
wayblink 已提交
677
	//rootCoord.SetIndexCoord(cluster.indexCoord)
678 679
	cluster.DataCoord.SetRootCoord(rootCoord)
	cluster.QueryCoord.SetRootCoord(rootCoord)
W
wayblink 已提交
680
	//cluster.indexCoord.SetRootCoord(rootCoord)
681 682
	cluster.Proxy.SetRootCoordClient(rootCoord)
	for _, dataNode := range cluster.DataNodes {
W
wayblink 已提交
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
		err = dataNode.SetRootCoord(rootCoord)
		if err != nil {
			return err
		}
	}

	// start
	err = rootCoord.Init()
	if err != nil {
		return err
	}
	err = rootCoord.Start()
	if err != nil {
		return err
	}
	err = rootCoord.Register()
	if err != nil {
		return err
	}

703
	cluster.RootCoord = rootCoord
W
wayblink 已提交
704 705 706 707 708 709 710 711 712 713
	log.Debug("mini cluster AddRootCoord succeed")
	return nil
}

// RemoveRootCoord from the cluster
func (cluster *MiniCluster) RemoveRootCoord(rootCoord types.RootCoordComponent) error {
	log.Debug("mini cluster RemoveRootCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

714
	if cluster.RootCoord == nil {
W
wayblink 已提交
715 716 717 718
		log.Info("mini cluster has no rootCoord, no need to remove")
		return nil
	}

719 720
	cluster.RootCoord.Stop()
	cluster.RootCoord = nil
W
wayblink 已提交
721 722 723 724 725 726 727 728 729 730 731
	log.Debug("mini cluster RemoveRootCoord succeed")
	return nil
}

// AddDataCoord to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) error {
	log.Debug("mini cluster AddDataCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
732
	if cluster.DataCoord != nil {
W
wayblink 已提交
733 734 735 736 737 738 739 740 741 742 743
		return errors.New("dataCoord already exist, maybe you need to remove it first")
	}
	if dataCoord == nil {
		dataCoord, err = cluster.CreateDefaultDataCoord()
		if err != nil {
			return err
		}
	}

	// link
	//dataCoord.SetIndexCoord(cluster.indexCoord)
744 745
	dataCoord.SetRootCoord(cluster.RootCoord)
	err = cluster.RootCoord.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
746 747 748
	if err != nil {
		return err
	}
749
	err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
750 751 752 753 754 755 756
	if err != nil {
		return err
	}
	//err = cluster.indexCoord.SetDataCoord(cluster.dataCoord)
	//if err != nil {
	//	return err
	//}
757 758
	cluster.Proxy.SetDataCoordClient(dataCoord)
	for _, dataNode := range cluster.DataNodes {
W
wayblink 已提交
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778
		err = dataNode.SetDataCoord(dataCoord)
		if err != nil {
			return err
		}
	}

	// start
	err = dataCoord.Init()
	if err != nil {
		return err
	}
	err = dataCoord.Start()
	if err != nil {
		return err
	}
	err = dataCoord.Register()
	if err != nil {
		return err
	}

779
	cluster.DataCoord = dataCoord
W
wayblink 已提交
780 781 782 783 784 785 786 787 788 789
	log.Debug("mini cluster AddDataCoord succeed")
	return nil
}

// RemoveDataCoord from the cluster
func (cluster *MiniCluster) RemoveDataCoord(dataCoord types.DataCoordComponent) error {
	log.Debug("mini cluster RemoveDataCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

790
	if cluster.DataCoord == nil {
W
wayblink 已提交
791 792 793 794
		log.Info("mini cluster has no dataCoord, no need to remove")
		return nil
	}

795 796
	cluster.DataCoord.Stop()
	cluster.DataCoord = nil
W
wayblink 已提交
797 798 799 800 801 802 803 804 805 806 807
	log.Debug("mini cluster RemoveDataCoord succeed")
	return nil
}

// AddQueryCoord to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent) error {
	log.Debug("mini cluster AddQueryCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
808
	if cluster.QueryCoord != nil {
W
wayblink 已提交
809 810 811 812 813 814 815 816 817 818
		return errors.New("queryCoord already exist, maybe you need to remove it first")
	}
	if queryCoord == nil {
		queryCoord, err = cluster.CreateDefaultQueryCoord()
		if err != nil {
			return err
		}
	}

	// link
819 820
	queryCoord.SetRootCoord(cluster.RootCoord)
	queryCoord.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
821
	//queryCoord.SetIndexCoord(cluster.indexCoord)
822 823
	cluster.RootCoord.SetQueryCoord(queryCoord)
	cluster.Proxy.SetQueryCoordClient(queryCoord)
W
wayblink 已提交
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838

	// start
	err = queryCoord.Init()
	if err != nil {
		return err
	}
	err = queryCoord.Start()
	if err != nil {
		return err
	}
	err = queryCoord.Register()
	if err != nil {
		return err
	}

839
	cluster.QueryCoord = queryCoord
W
wayblink 已提交
840 841 842 843 844 845 846 847 848 849
	log.Debug("mini cluster AddQueryCoord succeed")
	return nil
}

// RemoveQueryCoord from the cluster
func (cluster *MiniCluster) RemoveQueryCoord(queryCoord types.QueryCoordComponent) error {
	log.Debug("mini cluster RemoveQueryCoord start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

850
	if cluster.QueryCoord == nil {
W
wayblink 已提交
851 852 853 854
		log.Info("mini cluster has no queryCoord, no need to remove")
		return nil
	}

855 856
	cluster.QueryCoord.Stop()
	cluster.QueryCoord = nil
W
wayblink 已提交
857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934
	log.Debug("mini cluster RemoveQueryCoord succeed")
	return nil
}

// AddIndexCoord to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
//func (cluster *MiniCluster) AddIndexCoord(indexCoord types.IndexCoordComponent) error {
//	log.Debug("mini cluster AddIndexCoord start")
//	cluster.mu.Lock()
//	defer cluster.mu.Unlock()
//	var err error
//	if cluster.indexCoord != nil {
//		return errors.New("indexCoord already exist, maybe you need to remove it first")
//	}
//	if indexCoord == nil {
//		indexCoord, err = cluster.CreateDefaultIndexCoord()
//		if err != nil {
//			return err
//		}
//	}
//
//	// link
//	indexCoord.SetDataCoord(cluster.dataCoord)
//	indexCoord.SetRootCoord(cluster.rootCoord)
//	//cluster.dataCoord.SetIndexCoord(indexCoord)
//	cluster.queryCoord.SetIndexCoord(indexCoord)
//	//cluster.rootCoord.SetIndexCoord(indexCoord)
//	//cluster.proxy.SetIndexCoordClient(indexCoord)
//
//	// start
//	err = indexCoord.Init()
//	if err != nil {
//		return err
//	}
//	err = indexCoord.Start()
//	if err != nil {
//		return err
//	}
//	err = indexCoord.Register()
//	if err != nil {
//		return err
//	}
//
//	cluster.indexCoord = indexCoord
//	log.Debug("mini cluster AddIndexCoord succeed")
//	return nil
//}

// RemoveIndexCoord from the cluster
//func (cluster *MiniCluster) RemoveIndexCoord(indexCoord types.IndexCoordComponent) error {
//	log.Debug("mini cluster RemoveIndexCoord start")
//	cluster.mu.Lock()
//	defer cluster.mu.Unlock()
//
//	if cluster.indexCoord == nil {
//		log.Info("mini cluster has no indexCoord, no need to remove")
//		return nil
//	}
//
//	cluster.indexCoord.Stop()
//	cluster.indexCoord = nil
//	log.Debug("mini cluster RemoveIndexCoord succeed")
//	return nil
//}

// AddDataNode to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddDataNode(dataNode types.DataNodeComponent) error {
	log.Debug("mini cluster AddDataNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
	if dataNode == nil {
		dataNode, err = cluster.CreateDefaultDataNode()
		if err != nil {
			return err
		}
	}
935
	err = dataNode.SetDataCoord(cluster.DataCoord)
W
wayblink 已提交
936 937 938
	if err != nil {
		return err
	}
939
	err = dataNode.SetRootCoord(cluster.RootCoord)
W
wayblink 已提交
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
	if err != nil {
		return err
	}
	err = dataNode.Init()
	if err != nil {
		return err
	}
	err = dataNode.Start()
	if err != nil {
		return err
	}
	err = dataNode.Register()
	if err != nil {
		return err
	}
955
	cluster.DataNodes = append(cluster.DataNodes, dataNode)
W
wayblink 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968
	cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum + 1
	log.Debug("mini cluster AddDataNode succeed")
	return nil
}

// RemoveDataNode from the cluster, if pass nil, remove a node randomly
func (cluster *MiniCluster) RemoveDataNode(dataNode types.DataNodeComponent) error {
	log.Debug("mini cluster RemoveDataNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

	if dataNode == nil {
		// choose a node randomly
969 970 971
		if len(cluster.DataNodes) > 0 {
			randIndex := rand.Intn(len(cluster.DataNodes))
			dataNode = cluster.DataNodes[randIndex]
W
wayblink 已提交
972 973 974 975 976 977 978 979 980 981 982 983
		} else {
			log.Debug("mini cluster has no dataNodes")
			return nil
		}
	}

	err := dataNode.Stop()
	if err != nil {
		return err
	}

	newDataNodes := make([]types.DataNodeComponent, 0)
984
	for _, dn := range cluster.DataNodes {
W
wayblink 已提交
985 986 987 988 989
		if dn == dataNode {
			continue
		}
		newDataNodes = append(newDataNodes, dn)
	}
990
	cluster.DataNodes = newDataNodes
W
wayblink 已提交
991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
	cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum - 1
	log.Debug("mini cluster RemoveDataNode succeed")
	return nil
}

// AddQueryNode to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddQueryNode(queryNode types.QueryNodeComponent) error {
	log.Debug("mini cluster AddQueryNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
	if queryNode == nil {
		queryNode, err = cluster.CreateDefaultQueryNode()
		if err != nil {
			return err
		}
	}
	err = queryNode.Init()
	if err != nil {
		return err
	}
	err = queryNode.Start()
	if err != nil {
		return err
	}
	err = queryNode.Register()
	if err != nil {
		return err
	}
1021
	cluster.QueryNodes = append(cluster.QueryNodes, queryNode)
W
wayblink 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
	cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum + 1
	log.Debug("mini cluster AddQueryNode succeed")
	return nil
}

// RemoveQueryNode from the cluster, if pass nil, remove a node randomly
func (cluster *MiniCluster) RemoveQueryNode(queryNode types.QueryNodeComponent) error {
	log.Debug("mini cluster RemoveQueryNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

	if queryNode == nil {
		// choose a node randomly
1035 1036 1037
		if len(cluster.QueryNodes) > 0 {
			randIndex := rand.Intn(len(cluster.QueryNodes))
			queryNode = cluster.QueryNodes[randIndex]
W
wayblink 已提交
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
		} else {
			log.Debug("mini cluster has no queryNodes")
			return nil
		}
	}

	err := queryNode.Stop()
	if err != nil {
		return err
	}

	newQueryNodes := make([]types.QueryNodeComponent, 0)
1050
	for _, qn := range cluster.QueryNodes {
W
wayblink 已提交
1051 1052 1053 1054 1055
		if qn == queryNode {
			continue
		}
		newQueryNodes = append(newQueryNodes, qn)
	}
1056
	cluster.QueryNodes = newQueryNodes
W
wayblink 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
	cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum - 1
	log.Debug("mini cluster RemoveQueryNode succeed")
	return nil
}

// AddIndexNode to the cluster, you can use your own node for some specific purpose or
// pass nil to create a default one with cluster's setting.
func (cluster *MiniCluster) AddIndexNode(indexNode types.IndexNodeComponent) error {
	log.Debug("mini cluster AddIndexNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()
	var err error
	if indexNode == nil {
		indexNode, err = cluster.CreateDefaultIndexNode()
		if err != nil {
			return err
		}
	}
	err = indexNode.Init()
	if err != nil {
		return err
	}
	err = indexNode.Start()
	if err != nil {
		return err
	}
	err = indexNode.Register()
	if err != nil {
		return err
	}
1087
	cluster.IndexNodes = append(cluster.IndexNodes, indexNode)
W
wayblink 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
	cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum + 1
	log.Debug("mini cluster AddIndexNode succeed")
	return nil
}

// RemoveIndexNode from the cluster, if pass nil, remove a node randomly
func (cluster *MiniCluster) RemoveIndexNode(indexNode types.IndexNodeComponent) error {
	log.Debug("mini cluster RemoveIndexNode start")
	cluster.mu.Lock()
	defer cluster.mu.Unlock()

	if indexNode == nil {
		// choose a node randomly
1101 1102 1103
		if len(cluster.IndexNodes) > 0 {
			randIndex := rand.Intn(len(cluster.IndexNodes))
			indexNode = cluster.IndexNodes[randIndex]
W
wayblink 已提交
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
		} else {
			log.Debug("mini cluster has no queryNodes")
			return nil
		}
	}

	err := indexNode.Stop()
	if err != nil {
		return err
	}

	newIndexNodes := make([]types.IndexNodeComponent, 0)
1116
	for _, in := range cluster.IndexNodes {
W
wayblink 已提交
1117 1118 1119 1120 1121
		if in == indexNode {
			continue
		}
		newIndexNodes = append(newIndexNodes, in)
	}
1122
	cluster.IndexNodes = newIndexNodes
W
wayblink 已提交
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
	cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum - 1
	log.Debug("mini cluster RemoveIndexNode succeed")
	return nil
}

func (cluster *MiniCluster) UpdateClusterSize(clusterConfig ClusterConfig) error {
	log.Debug("mini cluster UpdateClusterSize start")
	if clusterConfig.DataNodeNum < 0 || clusterConfig.QueryNodeNum < 0 || clusterConfig.IndexNodeNum < 0 {
		return errors.New("Illegal cluster size config")
	}
	// todo concurrent concerns
	//cluster.mu.Lock()
	//defer cluster.mu.Unlock()
1136 1137
	if clusterConfig.DataNodeNum > len(cluster.DataNodes) {
		needAdd := clusterConfig.DataNodeNum - len(cluster.DataNodes)
W
wayblink 已提交
1138 1139 1140
		for i := 0; i < needAdd; i++ {
			cluster.AddDataNode(nil)
		}
1141 1142
	} else if clusterConfig.DataNodeNum < len(cluster.DataNodes) {
		needRemove := len(cluster.DataNodes) - clusterConfig.DataNodeNum
W
wayblink 已提交
1143 1144 1145 1146 1147
		for i := 0; i < needRemove; i++ {
			cluster.RemoveDataNode(nil)
		}
	}

1148 1149
	if clusterConfig.QueryNodeNum > len(cluster.QueryNodes) {
		needAdd := clusterConfig.QueryNodeNum - len(cluster.QueryNodes)
W
wayblink 已提交
1150 1151 1152
		for i := 0; i < needAdd; i++ {
			cluster.AddQueryNode(nil)
		}
1153 1154
	} else if clusterConfig.QueryNodeNum < len(cluster.QueryNodes) {
		needRemove := len(cluster.QueryNodes) - clusterConfig.QueryNodeNum
W
wayblink 已提交
1155 1156 1157 1158 1159
		for i := 0; i < needRemove; i++ {
			cluster.RemoveQueryNode(nil)
		}
	}

1160 1161
	if clusterConfig.IndexNodeNum > len(cluster.IndexNodes) {
		needAdd := clusterConfig.IndexNodeNum - len(cluster.IndexNodes)
W
wayblink 已提交
1162 1163 1164
		for i := 0; i < needAdd; i++ {
			cluster.AddIndexNode(nil)
		}
1165 1166
	} else if clusterConfig.IndexNodeNum < len(cluster.IndexNodes) {
		needRemove := len(cluster.IndexNodes) - clusterConfig.IndexNodeNum
W
wayblink 已提交
1167 1168 1169 1170 1171 1172
		for i := 0; i < needRemove; i++ {
			cluster.RemoveIndexNode(nil)
		}
	}

	// validate
1173 1174 1175
	if clusterConfig.DataNodeNum != len(cluster.DataNodes) ||
		clusterConfig.QueryNodeNum != len(cluster.QueryNodes) ||
		clusterConfig.IndexNodeNum != len(cluster.IndexNodes) {
W
wayblink 已提交
1176 1177 1178 1179 1180 1181 1182
		return errors.New("Fail to update cluster size to target size")
	}

	log.Debug("mini cluster UpdateClusterSize succeed")
	return nil
}

1183
func (cluster *MiniCluster) GetProxy(ctx context.Context, addr string, nodeID int64) (types.Proxy, error) {
1184 1185
	cluster.mu.RLock()
	defer cluster.mu.RUnlock()
1186 1187
	if cluster.Proxy.GetAddress() == addr {
		return cluster.Proxy, nil
W
wayblink 已提交
1188
	}
1189
	return nil, nil
W
wayblink 已提交
1190 1191
}

1192
func (cluster *MiniCluster) GetQueryNode(ctx context.Context, addr string, nodeID int64) (types.QueryNode, error) {
1193 1194
	cluster.mu.RLock()
	defer cluster.mu.RUnlock()
1195
	for _, queryNode := range cluster.QueryNodes {
W
wayblink 已提交
1196 1197 1198 1199 1200 1201 1202
		if queryNode.GetAddress() == addr {
			return queryNode, nil
		}
	}
	return nil, errors.New("no related queryNode found")
}

1203
func (cluster *MiniCluster) GetDataNode(ctx context.Context, addr string, nodeID int64) (types.DataNode, error) {
1204 1205
	cluster.mu.RLock()
	defer cluster.mu.RUnlock()
1206
	for _, dataNode := range cluster.DataNodes {
W
wayblink 已提交
1207 1208 1209 1210 1211 1212 1213
		if dataNode.GetAddress() == addr {
			return dataNode, nil
		}
	}
	return nil, errors.New("no related dataNode found")
}

1214
func (cluster *MiniCluster) GetIndexNode(ctx context.Context, addr string, nodeID int64) (types.IndexNode, error) {
1215 1216
	cluster.mu.RLock()
	defer cluster.mu.RUnlock()
1217
	for _, indexNode := range cluster.IndexNodes {
W
wayblink 已提交
1218 1219 1220 1221 1222 1223 1224 1225
		if indexNode.GetAddress() == addr {
			return indexNode, nil
		}
	}
	return nil, errors.New("no related indexNode found")
}

func (cluster *MiniCluster) GetMetaWatcher() MetaWatcher {
1226
	return cluster.MetaWatcher
W
wayblink 已提交
1227
}