index_coord.go 40.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package indexcoord
18 19

import (
20
	"context"
T
ThreadDao 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
J
Ji Bin 已提交
24
	"os"
25
	"strconv"
26
	"sync"
27
	"sync/atomic"
28
	"syscall"
29 30
	"time"

31
	"go.etcd.io/etcd/api/v3/mvccpb"
32
	v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
G
godchen 已提交
33
	clientv3 "go.etcd.io/etcd/client/v3"
34
	"go.uber.org/zap"
G
godchen 已提交
35

S
SimFG 已提交
36 37
	"github.com/milvus-io/milvus/api/commonpb"
	"github.com/milvus-io/milvus/api/milvuspb"
38
	"github.com/milvus-io/milvus/internal/common"
39
	"github.com/milvus-io/milvus/internal/kv"
X
Xiangyu Wang 已提交
40 41
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
42
	"github.com/milvus-io/milvus/internal/metastore/model"
43 44
	"github.com/milvus-io/milvus/internal/metrics"
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
45 46
	"github.com/milvus-io/milvus/internal/proto/indexpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
47
	"github.com/milvus-io/milvus/internal/storage"
48
	"github.com/milvus-io/milvus/internal/types"
49
	"github.com/milvus-io/milvus/internal/util"
50
	"github.com/milvus-io/milvus/internal/util/dependency"
51
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
52
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
53
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
54
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
55
	"github.com/milvus-io/milvus/internal/util/typeutil"
C
cai.zhang 已提交
56 57
)

58 59 60
// make sure IndexCoord implements types.IndexCoord
var _ types.IndexCoord = (*IndexCoord)(nil)

61
var Params paramtable.ComponentParam
62

63 64 65 66
// IndexCoord is a component responsible for scheduling index construction tasks and maintaining index status.
// IndexCoord accepts requests from rootcoord to build indexes, delete indexes, and query index information.
// IndexCoord is responsible for assigning IndexBuildID to the request to build the index, and forwarding the
// request to build the index to IndexNode. IndexCoord records the status of the index, and the index file.
67
type IndexCoord struct {
68
	stateCode atomic.Value
N
neza2017 已提交
69

70 71 72 73
	loopCtx    context.Context
	loopCancel func()
	loopWg     sync.WaitGroup

74 75 76
	sched    *TaskScheduler
	session  *sessionutil.Session
	serverID UniqueID
77

78 79
	eventChan <-chan *sessionutil.SessionEvent

G
godchen 已提交
80
	factory      dependency.Factory
81
	etcdCli      *clientv3.Client
82
	etcdKV       kv.MetaKv
83
	chunkManager storage.ChunkManager
84

85 86 87 88 89
	metaTable             *metaTable
	nodeManager           *NodeManager
	indexBuilder          *indexBuilder
	garbageCollector      *garbageCollector
	flushedSegmentWatcher *flushedSegmentWatcher
90
	handoff               *handoff
91

92 93
	metricsCacheManager *metricsinfo.MetricsCacheManager

C
cai.zhang 已提交
94 95
	nodeLock sync.RWMutex

C
cai.zhang 已提交
96 97
	initOnce  sync.Once
	startOnce sync.Once
C
cai.zhang 已提交
98

99 100
	reqTimeoutInterval time.Duration

101
	dataCoordClient types.DataCoord
102
	rootCoordClient types.RootCoord
103

104 105 106
	enableActiveStandBy bool
	activateFunc        func()

107 108 109
	// Add callback functions at different stages
	startCallbacks []func()
	closeCallbacks []func()
110 111
}

112
// UniqueID is an alias of int64, is used as a unique identifier for the request.
113 114
type UniqueID = typeutil.UniqueID

115
// NewIndexCoord creates a new IndexCoord component.
G
godchen 已提交
116
func NewIndexCoord(ctx context.Context, factory dependency.Factory) (*IndexCoord, error) {
S
sunby 已提交
117
	rand.Seed(time.Now().UnixNano())
C
cai.zhang 已提交
118
	ctx1, cancel := context.WithCancel(ctx)
119
	i := &IndexCoord{
120 121 122 123 124
		loopCtx:             ctx1,
		loopCancel:          cancel,
		reqTimeoutInterval:  time.Second * 10,
		factory:             factory,
		enableActiveStandBy: Params.IndexCoordCfg.EnableActiveStandby,
C
cai.zhang 已提交
125
	}
126
	i.UpdateStateCode(commonpb.StateCode_Abnormal)
X
Xiangyu Wang 已提交
127 128 129
	return i, nil
}

130
// Register register IndexCoord role at etcd.
131
func (i *IndexCoord) Register() error {
132
	i.session.Register()
133 134 135
	if i.enableActiveStandBy {
		i.session.ProcessActiveStandBy(i.activateFunc)
	}
136 137 138 139 140 141
	go i.session.LivenessCheck(i.loopCtx, func() {
		log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
		if err := i.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
142
		if i.session.TriggerKill {
J
Ji Bin 已提交
143 144 145
			if p, err := os.FindProcess(os.Getpid()); err == nil {
				p.Signal(syscall.SIGINT)
			}
X
Xiaofan 已提交
146
		}
147 148 149 150 151
	})
	return nil
}

func (i *IndexCoord) initSession() error {
152
	i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath, i.etcdCli)
153 154 155
	if i.session == nil {
		return errors.New("failed to initialize session")
	}
X
Xiaofan 已提交
156
	i.session.Init(typeutil.IndexCoordRole, Params.IndexCoordCfg.Address, true, true)
157
	i.session.SetEnableActiveStandBy(i.enableActiveStandBy)
158
	Params.SetLogger(i.session.ServerID)
159
	i.serverID = i.session.ServerID
160 161 162
	return nil
}

163
// Init initializes the IndexCoord component.
164
func (i *IndexCoord) Init() error {
165
	var initErr error
166
	Params.InitOnce()
C
cai.zhang 已提交
167
	i.initOnce.Do(func() {
168 169
		i.UpdateStateCode(commonpb.StateCode_Initializing)
		log.Debug("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
170

G
godchen 已提交
171 172
		i.factory.Init(&Params)

173 174 175 176 177 178
		err := i.initSession()
		if err != nil {
			log.Error(err.Error())
			initErr = err
			return
		}
C
cai.zhang 已提交
179 180

		connectEtcdFn := func() error {
181 182
			i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
			i.metaTable, err = NewMetaTable(i.etcdKV)
C
cai.zhang 已提交
183 184
			return err
		}
C
cai.zhang 已提交
185
		log.Debug("IndexCoord try to connect etcd")
186
		err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100))
C
cai.zhang 已提交
187
		if err != nil {
188
			log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
C
cai.zhang 已提交
189 190
			initErr = err
			return
C
cai.zhang 已提交
191
		}
192

C
cai.zhang 已提交
193
		log.Debug("IndexCoord try to connect etcd success")
194
		i.nodeManager = NewNodeManager(i.loopCtx)
195

C
cai.zhang 已提交
196
		sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
197
		log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
C
cai.zhang 已提交
198
		if err != nil {
199
			log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
C
cai.zhang 已提交
200 201 202
			initErr = err
			return
		}
203
		aliveNodeID := make([]UniqueID, 0)
C
cai.zhang 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
		if Params.IndexCoordCfg.BindIndexNodeMode {
			if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {
				log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID),
					zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err))
				initErr = err
				return
			}
			log.Debug("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
				zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
			aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
			metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc()
		} else {
			for _, session := range sessions {
				session := session
				if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
					log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
						zap.Error(err))
					continue
				}
				aliveNodeID = append(aliveNodeID, session.ServerID)
224
			}
C
cai.zhang 已提交
225
		}
226
		log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
227 228
		i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)

229
		// TODO silverxia add Rewatch logic
230
		i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
G
godchen 已提交
231

232
		chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx)
C
cai.zhang 已提交
233
		if err != nil {
234
			log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
C
cai.zhang 已提交
235 236 237
			initErr = err
			return
		}
238 239
		log.Debug("IndexCoord new minio chunkManager success")
		i.chunkManager = chunkManager
C
cai.zhang 已提交
240

241
		i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
242 243
		i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
		i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
244 245 246 247 248 249
		if err != nil {
			initErr = err
			return
		}

		i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable)
C
cai.zhang 已提交
250
		if err != nil {
251
			log.Error("IndexCoord new task scheduler failed", zap.Error(err))
C
cai.zhang 已提交
252 253 254 255 256 257 258
			initErr = err
			return
		}
		log.Debug("IndexCoord new task scheduler success")

		i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	})
259

C
cai.zhang 已提交
260
	log.Debug("IndexCoord init finished", zap.Error(initErr))
261

C
cai.zhang 已提交
262
	return initErr
263 264
}

265
// Start starts the IndexCoord component.
266
func (i *IndexCoord) Start() error {
267
	var startErr error
C
cai.zhang 已提交
268
	i.startOnce.Do(func() {
C
cai.zhang 已提交
269 270
		i.loopWg.Add(1)
		go i.watchNodeLoop()
271

C
cai.zhang 已提交
272
		i.loopWg.Add(1)
273
		go i.watchFlushedSegmentLoop()
274

C
cai.zhang 已提交
275
		startErr = i.sched.Start()
276

277 278
		i.indexBuilder.Start()
		i.garbageCollector.Start()
279
		i.handoff.Start()
280
		i.flushedSegmentWatcher.Start()
281

282
		i.UpdateStateCode(commonpb.StateCode_Healthy)
C
cai.zhang 已提交
283
	})
C
cai.zhang 已提交
284 285 286 287
	// Start callbacks
	for _, cb := range i.startCallbacks {
		cb()
	}
C
cai.zhang 已提交
288

289 290
	Params.IndexCoordCfg.CreatedTime = time.Now()
	Params.IndexCoordCfg.UpdatedTime = time.Now()
291

292 293 294 295
	if i.enableActiveStandBy {
		i.activateFunc = func() {
			log.Info("IndexCoord switch from standby to active, reload the KV")
			i.metaTable.reloadFromKV()
296
			i.UpdateStateCode(commonpb.StateCode_Healthy)
297
		}
298
		i.UpdateStateCode(commonpb.StateCode_StandBy)
299 300
		log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
	} else {
301
		i.UpdateStateCode(commonpb.StateCode_Healthy)
302 303
		log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
	}
C
cai.zhang 已提交
304

C
cai.zhang 已提交
305
	return startErr
306 307
}

308
// Stop stops the IndexCoord component.
309
func (i *IndexCoord) Stop() error {
310
	// https://github.com/milvus-io/milvus/issues/12282
311
	i.UpdateStateCode(commonpb.StateCode_Abnormal)
312

313 314 315 316 317 318 319 320 321
	if i.loopCancel != nil {
		i.loopCancel()
		log.Info("cancel the loop of IndexCoord")
	}

	if i.sched != nil {
		i.sched.Close()
		log.Info("close the task scheduler of IndexCoord")
	}
322
	i.loopWg.Wait()
323

324 325 326 327 328 329 330 331
	if i.indexBuilder != nil {
		i.indexBuilder.Stop()
		log.Info("stop the index builder of IndexCoord")
	}
	if i.garbageCollector != nil {
		i.garbageCollector.Stop()
		log.Info("stop the garbage collector of IndexCoord")
	}
332 333 334 335
	if i.flushedSegmentWatcher != nil {
		i.flushedSegmentWatcher.Stop()
		log.Info("stop the flushed segment watcher")
	}
336

C
cai.zhang 已提交
337 338 339
	for _, cb := range i.closeCallbacks {
		cb()
	}
C
congqixia 已提交
340
	i.session.Revoke(time.Second)
341

C
cai.zhang 已提交
342
	return nil
343 344
}

X
Xiaofan 已提交
345 346 347 348
func (i *IndexCoord) SetEtcdClient(etcdClient *clientv3.Client) {
	i.etcdCli = etcdClient
}

349 350 351 352 353 354 355 356 357 358
// SetDataCoord sets data coordinator's client
func (i *IndexCoord) SetDataCoord(dataCoord types.DataCoord) error {
	if dataCoord == nil {
		return errors.New("null DataCoord interface")
	}

	i.dataCoordClient = dataCoord
	return nil
}

359 360 361 362 363 364 365 366 367 368
// SetRootCoord sets data coordinator's client
func (i *IndexCoord) SetRootCoord(rootCoord types.RootCoord) error {
	if rootCoord == nil {
		return errors.New("null RootCoord interface")
	}

	i.rootCoordClient = rootCoord
	return nil
}

369
// UpdateStateCode updates the component state of IndexCoord.
370
func (i *IndexCoord) UpdateStateCode(code commonpb.StateCode) {
371
	i.stateCode.Store(code)
X
Xiangyu Wang 已提交
372 373
}

374
func (i *IndexCoord) isHealthy() bool {
375 376
	code := i.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
377 378
}

379
// GetComponentStates gets the component states of IndexCoord.
380
func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
381
	log.Debug("get IndexCoord component states ...")
382 383 384 385 386 387

	nodeID := common.NotRegisteredID
	if i.session != nil && i.session.Registered() {
		nodeID = i.session.ServerID
	}

388
	stateInfo := &milvuspb.ComponentInfo{
389
		NodeID:    nodeID,
390
		Role:      "IndexCoord",
391
		StateCode: i.stateCode.Load().(commonpb.StateCode),
N
neza2017 已提交
392 393
	}

394
	ret := &milvuspb.ComponentStates{
N
neza2017 已提交
395 396 397
		State:              stateInfo,
		SubcomponentStates: nil, // todo add subcomponents states
		Status: &commonpb.Status{
398
			ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
399 400
		},
	}
401
	log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo))
N
neza2017 已提交
402
	return ret, nil
403 404
}

405
// GetStatisticsChannel gets the statistics channel of IndexCoord.
406 407
func (i *IndexCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	log.Debug("get IndexCoord statistics channel ...")
G
godchen 已提交
408 409
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
410
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
411 412 413 414
			Reason:    "",
		},
		Value: "",
	}, nil
415 416
}

417 418 419 420 421
// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
// indexBuilder will find this task and assign it to IndexNode for execution.
func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
422
	if !i.isHealthy() {
423 424
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &commonpb.Status{
425
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
426 427
			Reason:    msgIndexCoordIsUnhealthy(i.serverID),
		}, nil
428
	}
429 430 431 432
	log.Debug("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID),
		zap.String("IndexName", req.IndexName), zap.Int64("fieldID", req.FieldID),
		zap.Any("TypeParams", req.TypeParams),
		zap.Any("IndexParams", req.IndexParams))
433

434 435 436
	ret := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
437

438 439 440
	if !i.metaTable.CanCreateIndex(req) {
		ret.Reason = "CreateIndex failed: index already exist, but parameters are inconsistent"
		return ret, nil
441
	}
442

443
	t := &CreateIndexTask{
S
sunby 已提交
444 445 446 447 448
		BaseTask: BaseTask{
			ctx:   ctx,
			done:  make(chan error),
			table: i.metaTable,
		},
449 450 451 452
		dataCoordClient:  i.dataCoordClient,
		rootCoordClient:  i.rootCoordClient,
		indexCoordClient: i,
		req:              req,
S
sunby 已提交
453
	}
C
cai.zhang 已提交
454

455
	err := i.sched.IndexAddQueue.Enqueue(t)
C
cai.zhang 已提交
456
	if err != nil {
457 458
		ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
		ret.Reason = err.Error()
C
cai.zhang 已提交
459 460
		return ret, nil
	}
461
	log.Debug("IndexCoord create index enqueue successfully", zap.Int64("IndexID", t.indexID))
462

C
cai.zhang 已提交
463 464
	err = t.WaitToFinish()
	if err != nil {
465 466 467 468
		log.Error("IndexCoord scheduler creating index task fail", zap.Int64("collectionID", req.CollectionID),
			zap.Int64("fieldID", req.FieldID), zap.String("indexName", req.IndexName), zap.Error(err))
		ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
		ret.Reason = err.Error()
C
cai.zhang 已提交
469 470
		return ret, nil
	}
471 472

	ret.ErrorCode = commonpb.ErrorCode_Success
C
cai.zhang 已提交
473
	return ret, nil
474 475
}

476 477 478 479 480
// GetIndexState gets the index state of the index name in the request from Proxy.
func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
	log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
		zap.String("indexName", req.IndexName))

481
	if !i.isHealthy() {
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &indexpb.GetIndexStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
			},
		}, nil
	}

	indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
	if len(indexID2CreateTs) == 0 {
		errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
		log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
			zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
		return &indexpb.GetIndexStateResponse{
497 498 499 500 501 502
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			},
		}, nil
	}
503 504 505 506 507 508 509 510
	ret := &indexpb.GetIndexStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		State: commonpb.IndexState_Finished,
	}

	for indexID, createTs := range indexID2CreateTs {
511
		indexStates, _ := i.metaTable.GetIndexStates(indexID, createTs)
512 513 514 515 516 517 518 519
		for _, state := range indexStates {
			if state.state != commonpb.IndexState_Finished {
				ret.State = state.state
				ret.FailReason = state.failReason
				log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
					zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
				return ret, nil
			}
B
BossZou 已提交
520 521
		}
	}
C
Cai Yudong 已提交
522

523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
	log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
		zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
	return ret, nil
}

func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
	log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
		zap.String("indexName", req.IndexName))

	if !i.isHealthy() {
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &indexpb.GetSegmentIndexStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
			},
		}, nil
	}

	ret := &indexpb.GetSegmentIndexStateResponse{
543
		Status: &commonpb.Status{
544
			ErrorCode: commonpb.ErrorCode_Success,
545
		},
546 547 548
		States: make([]*indexpb.SegmentIndexState, 0),
	}
	indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
549
	if len(indexID2CreateTs) == 0 {
550 551 552 553 554 555 556 557 558 559
		errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
		log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
			zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
		return &indexpb.GetSegmentIndexStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			},
		}, nil
	}
560 561 562 563 564 565 566
	for _, segID := range req.SegmentIDs {
		state := i.metaTable.GetSegmentIndexState(segID)
		ret.States = append(ret.States, &indexpb.SegmentIndexState{
			SegmentID:  segID,
			State:      state.state,
			FailReason: state.failReason,
		})
567 568
	}
	return ret, nil
569 570
}

571
// completeIndexInfo get the building index progress and index state
572 573 574 575 576 577
func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error {
	collectionID := indexInfo.CollectionID
	indexName := indexInfo.IndexName
	log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID),
		zap.String("indexName", indexName))

578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
	calculateTotalRow := func() (int64, error) {
		totalRows := int64(0)
		flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
			CollectionID: collectionID,
			PartitionID:  -1,
		})
		if err != nil {
			return totalRows, err
		}

		resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
			SegmentIDs: flushSegments.Segments,
		})
		if err != nil {
			return totalRows, err
		}
594

595 596 597 598 599 600
		for _, seg := range resp.Infos {
			if seg.State == commonpb.SegmentState_Flushed {
				totalRows += seg.NumOfRows
			}
		}
		return totalRows, nil
601 602 603 604 605 606 607 608
	}

	indexID2CreateTs := i.metaTable.GetIndexIDByName(collectionID, indexName)
	if len(indexID2CreateTs) < 1 {
		log.Error("there is no index on collection", zap.Int64("collectionID", collectionID), zap.String("indexName", indexName))
		return nil
	}

609 610 611 612 613 614 615
	var indexID int64
	var createTs uint64
	// the size of `indexID2CreateTs` map is one
	// and we need to get key and value through the `for` statement
	for k, v := range indexID2CreateTs {
		indexID = k
		createTs = v
616 617 618
		break
	}

619 620 621 622 623 624
	totalRow, err := calculateTotalRow()
	if err != nil {
		return err
	}
	indexInfo.TotalRows = totalRow

625 626 627 628 629 630 631 632
	indexStates, indexStateCnt := i.metaTable.GetIndexStates(indexID, createTs)
	allCnt := len(indexStates)
	switch {
	case indexStateCnt.Failed > 0:
		indexInfo.State = commonpb.IndexState_Failed
		indexInfo.IndexStateFailReason = indexStateCnt.FailReason
	case indexStateCnt.Finished == allCnt:
		indexInfo.State = commonpb.IndexState_Finished
633
		indexInfo.IndexedRows = totalRow
634 635 636
	default:
		indexInfo.State = commonpb.IndexState_InProgress
		indexInfo.IndexedRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
637

638 639 640
	}

	log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID),
641 642
		zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
		zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason))
643 644 645
	return nil
}

646 647 648 649
// GetIndexBuildProgress get the index building progress by num rows.
func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
	log.Info("IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID),
		zap.String("indexName", req.IndexName))
650
	if !i.isHealthy() {
651 652 653 654 655 656
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &indexpb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
			},
657 658
		}, nil
	}
659

660 661 662 663 664 665 666 667 668 669
	flushSegments, err := i.dataCoordClient.GetFlushedSegments(ctx, &datapb.GetFlushedSegmentsRequest{
		CollectionID: req.CollectionID,
		PartitionID:  -1,
	})
	if err != nil {
		return &indexpb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
			},
		}, err
670
	}
671 672 673 674

	resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
		SegmentIDs: flushSegments.Segments,
	})
675
	if err != nil {
676 677 678 679 680
		return &indexpb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
			},
		}, err
681
	}
682 683 684 685
	totalRows, indexRows := int64(0), int64(0)

	for _, seg := range resp.Infos {
		totalRows += seg.NumOfRows
686
	}
687

688 689 690 691 692 693 694 695 696 697 698 699
	indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
	if len(indexID2CreateTs) < 1 {
		errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)
		log.Error("IndexCoord get index state fail", zap.Int64("collectionID", req.CollectionID),
			zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg))
		return &indexpb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			},
		}, nil
	}
700

701 702 703 704
	for indexID, createTs := range indexID2CreateTs {
		indexRows = i.metaTable.GetIndexBuildProgress(indexID, createTs)
		break
	}
705

706 707 708 709 710 711 712 713 714 715
	log.Debug("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
		zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos)))
	return &indexpb.GetIndexBuildProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		IndexedRows: indexRows,
		TotalRows:   totalRows,
	}, nil
}
716

717 718 719 720 721
// DropIndex deletes indexes based on IndexName. One IndexName corresponds to the index of an entire column. A column is
// divided into many segments, and each segment corresponds to an IndexBuildID. IndexCoord uses IndexBuildID to record
// index tasks.
func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
	log.Info("IndexCoord DropIndex", zap.Int64("collectionID", req.CollectionID),
722
		zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName))
723
	if !i.isHealthy() {
724
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
725 726
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
727
			Reason:    msgIndexCoordIsUnhealthy(i.serverID),
728 729 730 731 732 733
		}, nil
	}

	ret := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
734

735 736 737 738 739 740 741 742 743
	indexID2CreateTs := i.metaTable.GetIndexIDByName(req.CollectionID, req.IndexName)
	if len(indexID2CreateTs) == 0 {
		log.Warn(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName))
		return ret, nil
	}
	indexIDs := make([]UniqueID, 0)
	for indexID := range indexID2CreateTs {
		indexIDs = append(indexIDs, indexID)
	}
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
	if len(req.GetPartitionIDs()) == 0 {
		// drop collection index
		err := i.metaTable.MarkIndexAsDeleted(req.CollectionID, indexIDs)
		if err != nil {
			log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
				zap.String("indexName", req.IndexName), zap.Error(err))
			ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
			ret.Reason = err.Error()
			return ret, nil
		}
	} else {
		err := i.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
			for _, partitionID := range req.PartitionIDs {
				if segIndex.CollectionID == req.CollectionID && segIndex.PartitionID == partitionID {
					return true
				}
			}
			return false
		})
		if err != nil {
			log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
				zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName), zap.Error(err))
			ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
			ret.Reason = err.Error()
			return ret, nil
		}
770 771
	}

772
	log.Info("IndexCoord DropIndex success", zap.Int64("collID", req.CollectionID),
773 774
		zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName),
		zap.Int64s("indexIDs", indexIDs))
775 776 777
	return ret, nil
}

778 779 780 781
// TODO @xiaocai2333: drop index on the segments when drop partition. (need?)

// GetIndexInfos gets the index file paths from IndexCoord.
func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
782
	if !i.isHealthy() {
783 784
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &indexpb.GetIndexInfoResponse{
785 786
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
787
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
788
			},
789
			SegmentInfo: nil,
790 791
		}, nil
	}
792 793 794 795 796 797
	ret := &indexpb.GetIndexInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		SegmentInfo: map[int64]*indexpb.SegmentInfo{},
	}
798

799 800 801 802 803 804 805
	for _, segID := range req.SegmentIDs {
		segIdxes := i.metaTable.GetSegmentIndexes(segID)
		ret.SegmentInfo[segID] = &indexpb.SegmentInfo{
			CollectionID: req.CollectionID,
			SegmentID:    segID,
			EnableIndex:  false,
			IndexInfos:   make([]*indexpb.IndexFilePathInfo, 0),
806
		}
807 808 809 810 811 812 813 814 815 816 817 818 819
		if len(segIdxes) != 0 {
			ret.SegmentInfo[segID].EnableIndex = true
			for _, segIdx := range segIdxes {
				ret.SegmentInfo[segID].IndexInfos = append(ret.SegmentInfo[segID].IndexInfos,
					&indexpb.IndexFilePathInfo{
						SegmentID:      segID,
						FieldID:        i.metaTable.GetFieldIDByIndexID(segIdx.CollectionID, segIdx.IndexID),
						IndexID:        segIdx.IndexID,
						BuildID:        segIdx.BuildID,
						IndexName:      i.metaTable.GetIndexNameByID(segIdx.CollectionID, segIdx.IndexID),
						IndexParams:    i.metaTable.GetIndexParams(segIdx.CollectionID, segIdx.IndexID),
						IndexFilePaths: segIdx.IndexFilePaths,
						SerializedSize: segIdx.IndexSize,
820
						IndexVersion:   segIdx.IndexVersion,
821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852
					})
			}
		}
	}

	return ret, nil
}

// DescribeIndex describe the index info of the collection.
func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
	if !i.isHealthy() {
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
		return &indexpb.DescribeIndexResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
			},
			IndexInfos: nil,
		}, nil
	}

	indexes := i.metaTable.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
	if len(indexes) == 0 {
		return &indexpb.DescribeIndexResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IndexNotExist,
				Reason:    "index not exist",
			},
		}, nil
	}
	indexInfos := make([]*indexpb.IndexInfo, 0)
	for _, index := range indexes {
853
		indexInfo := &indexpb.IndexInfo{
Z
zhenshan.cao 已提交
854 855 856 857 858 859 860
			CollectionID:    index.CollectionID,
			FieldID:         index.FieldID,
			IndexName:       index.IndexName,
			TypeParams:      index.TypeParams,
			IndexParams:     index.IndexParams,
			IsAutoIndex:     index.IsAutoIndex,
			UserIndexParams: index.UserIndexParams,
861 862 863 864 865 866 867 868 869 870 871 872
		}
		if err := i.completeIndexInfo(ctx, indexInfo); err != nil {
			log.Error("IndexCoord describe index fail", zap.Int64("collectionID", req.CollectionID),
				zap.String("indexName", req.IndexName), zap.Error(err))
			return &indexpb.DescribeIndexResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}
		indexInfos = append(indexInfos, indexInfo)
C
cai.zhang 已提交
873
	}
874

875
	return &indexpb.DescribeIndexResponse{
C
cai.zhang 已提交
876
		Status: &commonpb.Status{
877
			ErrorCode: commonpb.ErrorCode_Success,
C
cai.zhang 已提交
878
		},
879 880
		IndexInfos: indexInfos,
	}, nil
C
cai.zhang 已提交
881
}
882

883
// ShowConfigurations returns the configurations of indexCoord matching req.Pattern
884 885 886 887
func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
	log.Debug("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
	if !i.isHealthy() {
		log.Warn("IndexCoord.ShowConfigurations failed",
888
			zap.Int64("nodeId", i.serverID),
889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
			zap.String("req", req.Pattern),
			zap.Error(errIndexCoordIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))

		return &internalpb.ShowConfigurationsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgIndexCoordIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
			},
			Configuations: nil,
		}, nil
	}

	return getComponentConfigurations(ctx, req), nil
}

904
// GetMetrics gets the metrics info of IndexCoord.
905
func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
906
	log.Debug("IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request))
907 908

	if !i.isHealthy() {
909
		log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
910 911 912 913

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
914
				Reason:    msgIndexCoordIsUnhealthy(i.serverID),
915 916 917 918 919 920 921
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
922
		log.Error("IndexCoord.GetMetrics failed to parse metric type",
C
cai.zhang 已提交
923
			zap.Int64("node id", i.session.ServerID),
924 925 926 927 928 929 930 931 932 933 934 935 936
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("IndexCoord.GetMetrics",
C
cai.zhang 已提交
937
		zap.String("metric type", metricType))
938 939

	if metricType == metricsinfo.SystemInfoMetrics {
940 941 942 943
		ret, err := i.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
944
		log.Error("failed to get system info metrics from cache, recompute instead",
945 946
			zap.Error(err))

947 948 949
		metrics, err := getSystemInfoMetrics(ctx, req, i)

		log.Debug("IndexCoord.GetMetrics",
C
cai.zhang 已提交
950
			zap.Int64("node id", i.session.ServerID),
951
			zap.String("req", req.Request),
C
cai.zhang 已提交
952 953
			zap.String("metric type", metricType),
			zap.String("metrics", metrics.Response), // TODO(dragondriver): necessary? may be very large
954 955
			zap.Error(err))

956 957
		i.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
958
		return metrics, nil
959 960 961
	}

	log.Debug("IndexCoord.GetMetrics failed, request metric type is not implemented yet",
C
cai.zhang 已提交
962
		zap.Int64("node id", i.session.ServerID),
963
		zap.String("req", req.Request),
C
cai.zhang 已提交
964
		zap.String("metric type", metricType))
965 966 967 968 969 970 971 972 973 974

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

C
cai.zhang 已提交
975
// watchNodeLoop is used to monitor IndexNode going online and offline.
976 977 978
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
979 980
//
//go:norace
981
func (i *IndexCoord) watchNodeLoop() {
982 983 984 985
	ctx, cancel := context.WithCancel(i.loopCtx)

	defer cancel()
	defer i.loopWg.Done()
986
	log.Debug("IndexCoord watchNodeLoop start")
987 988 989 990 991

	for {
		select {
		case <-ctx.Done():
			return
992 993
		case event, ok := <-i.eventChan:
			if !ok {
994 995 996 997
				// ErrCompacted is handled inside SessionWatcher
				log.Error("Session Watcher channel closed", zap.Int64("server id", i.session.ServerID))
				go i.Stop()
				if i.session.TriggerKill {
J
Ji Bin 已提交
998 999 1000
					if p, err := os.FindProcess(os.Getpid()); err == nil {
						p.Signal(syscall.SIGINT)
					}
1001
				}
1002 1003
				return
			}
C
cai.zhang 已提交
1004 1005 1006
			if Params.IndexCoordCfg.BindIndexNodeMode {
				continue
			}
1007
			log.Debug("IndexCoord watchNodeLoop event updated")
1008 1009 1010
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
1011 1012
				log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID),
					zap.String("address", event.Session.Address))
1013 1014 1015 1016 1017 1018
				go func() {
					err := i.nodeManager.AddNode(serverID, event.Session.Address)
					if err != nil {
						log.Error("IndexCoord", zap.Any("Add IndexNode err", err))
					}
				}()
1019
				i.metricsCacheManager.InvalidateSystemInfoMetrics()
1020 1021
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
1022
				log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID))
1023
				i.nodeManager.RemoveNode(serverID)
1024 1025
				// remove tasks on nodeID
				i.indexBuilder.nodeDown(serverID)
1026
				i.metricsCacheManager.InvalidateSystemInfoMetrics()
1027 1028 1029 1030 1031
			}
		}
	}
}

1032
func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID, segIDs []UniqueID) error {
1033
	// IndexCoord use buildID instead of taskID.
1034 1035
	log.Info("try to acquire segment reference lock", zap.Int64("buildID", buildID),
		zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs))
1036
	status, err := i.dataCoordClient.AcquireSegmentLock(ctx, &datapb.AcquireSegmentLockRequest{
1037
		TaskID:     buildID,
1038
		NodeID:     nodeID,
1039 1040 1041
		SegmentIDs: segIDs,
	})
	if err != nil {
1042 1043
		log.Error("IndexCoord try to acquire segment reference lock failed", zap.Int64("buildID", buildID),
			zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs), zap.Error(err))
1044 1045 1046
		return err
	}
	if status.ErrorCode != commonpb.ErrorCode_Success {
1047 1048
		log.Error("IndexCoord try to acquire segment reference lock failed", zap.Int64("buildID", buildID),
			zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs), zap.Error(errors.New(status.Reason)))
1049 1050
		return errors.New(status.Reason)
	}
1051 1052
	log.Info("try to acquire segment reference lock success", zap.Int64("buildID", buildID),
		zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs))
1053 1054 1055
	return nil
}

1056
func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID) error {
1057 1058
	releaseLock := func() error {
		status, err := i.dataCoordClient.ReleaseSegmentLock(ctx, &datapb.ReleaseSegmentLockRequest{
1059
			TaskID: buildID,
1060
			NodeID: nodeID,
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
		})
		if err != nil {
			return err
		}
		if status.ErrorCode != commonpb.ErrorCode_Success {
			return errors.New(status.Reason)
		}
		return nil
	}
	err := retry.Do(ctx, releaseLock, retry.Attempts(100))
	if err != nil {
1072 1073
		log.Error("IndexCoord try to release segment reference lock failed", zap.Int64("buildID", buildID),
			zap.Int64("nodeID", nodeID), zap.Error(err))
1074 1075 1076 1077 1078
		return err
	}
	return nil
}

1079 1080
// assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within
// the interval, it is considered that the task sending failed.
1081
func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateJobRequest) error {
1082
	ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval)
1083
	defer cancel()
1084
	resp, err := builderClient.CreateJob(ctx, req)
1085
	if err != nil {
1086
		log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.Error(err))
1087
		return err
1088 1089 1090
	}

	if resp.ErrorCode != commonpb.ErrorCode_Success {
1091
		log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
1092
		return errors.New(resp.Reason)
1093
	}
1094
	return nil
1095
}
1096 1097 1098 1099

func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, UniqueID, error) {
	log.Info("create index for flushed segment", zap.Int64("collID", segIdx.CollectionID),
		zap.Int64("segID", segIdx.SegmentID), zap.Int64("numRows", segIdx.NumRows))
1100 1101 1102 1103 1104
	//if segIdx.NumRows < Params.IndexCoordCfg.MinSegmentNumRowsToEnableIndex {
	//	log.Debug("no need to build index", zap.Int64("collID", segIdx.CollectionID),
	//		zap.Int64("segID", segIdx.SegmentID), zap.Int64("numRows", segIdx.NumRows))
	//	return false, 0, nil
	//}
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180

	hasIndex, indexBuildID := i.metaTable.HasSameIndex(segIdx.SegmentID, segIdx.IndexID)
	if hasIndex {
		log.Debug("IndexCoord has same index", zap.Int64("buildID", indexBuildID), zap.Int64("segmentID", segIdx.SegmentID))
		return true, indexBuildID, nil
	}

	t := &IndexAddTask{
		BaseTask: BaseTask{
			ctx:   i.loopCtx,
			done:  make(chan error),
			table: i.metaTable,
		},
		segmentIndex:    segIdx,
		rootcoordClient: i.rootCoordClient,
	}

	metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()

	err := i.sched.IndexAddQueue.Enqueue(t)
	if err != nil {
		metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
		log.Error("IndexCoord createIndex enqueue failed", zap.Int64("collID", segIdx.CollectionID),
			zap.Int64("segID", segIdx.SegmentID), zap.Error(err))
		return false, 0, err
	}
	log.Debug("IndexCoord createIndex Enqueue successfully", zap.Int64("collID", segIdx.CollectionID),
		zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID))

	err = t.WaitToFinish()
	if err != nil {
		log.Error("IndexCoord scheduler index task failed", zap.Int64("buildID", t.segmentIndex.BuildID))
		metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
		return false, 0, err
	}
	metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc()

	return false, t.segmentIndex.BuildID, nil
}

func (i *IndexCoord) watchFlushedSegmentLoop() {
	log.Info("IndexCoord start watching flushed segments...")
	defer i.loopWg.Done()

	watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1)
	for {
		select {
		case <-i.loopCtx.Done():
			log.Warn("IndexCoord context done, exit...")
			return
		case resp, ok := <-watchChan:
			if !ok {
				log.Warn("IndexCoord watch flush segments loop failed because watch channel closed")
				return
			}
			if err := resp.Err(); err != nil {
				log.Warn("IndexCoord watchFlushedSegmentLoo receive etcd compacted error")
				if err == v3rpc.ErrCompacted {
					err = i.flushedSegmentWatcher.reloadFromKV()
					if err != nil {
						log.Error("Constructing flushed segment watcher fails when etcd has a compaction error",
							zap.String("etcd error", err.Error()), zap.Error(err))
						panic("failed to handle etcd request, exit..")
					}
					i.loopWg.Add(1)
					go i.watchFlushedSegmentLoop()
					return
				}
				log.Error("received error event from flushed segment watcher",
					zap.String("prefix", util.FlushedSegmentPrefix), zap.Error(err))
				panic("failed to handle etcd request, exit..")
			}
			events := resp.Events
			for _, event := range events {
				switch event.Type {
				case mvccpb.PUT:
1181 1182 1183 1184
					segmentID, err := strconv.ParseInt(string(event.Kv.Value), 10, 64)
					if err != nil {
						log.Error("IndexCoord watch flushed segment, but parse segmentID fail",
							zap.String("event.Value", string(event.Kv.Value)), zap.Error(err))
1185 1186
						continue
					}
1187 1188
					log.Debug("watchFlushedSegmentLoop watch event", zap.Int64("segID", segmentID))
					i.flushedSegmentWatcher.enqueueInternalTask(segmentID)
1189 1190 1191 1192 1193 1194 1195
				case mvccpb.DELETE:
					log.Debug("the segment info has been deleted", zap.String("key", string(event.Kv.Key)))
				}
			}
		}
	}
}
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222

func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*datapb.SegmentInfo, error) {
	resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
		SegmentIDs:       []int64{segmentID},
		IncludeUnHealthy: false,
	})
	if err != nil {
		log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
		return nil, err
	}
	if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
		log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID),
			zap.String("fail reason", resp.Status.GetReason()))
		if resp.Status.GetReason() == msgSegmentNotFound(segmentID) {
			return nil, errSegmentNotFound(segmentID)
		}
		return nil, errors.New(resp.Status.GetReason())
	}
	for _, info := range resp.Infos {
		if info.ID == segmentID {
			return info, nil
		}
	}
	errMsg := msgSegmentNotFound(segmentID)
	log.Error(errMsg)
	return nil, errSegmentNotFound(segmentID)
}