root_coord.go 96.9 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package rootcoord
18 19 20

import (
	"context"
G
godchen 已提交
21
	"encoding/json"
22
	"fmt"
23
	"math/rand"
J
Ji Bin 已提交
24
	"os"
25
	"strconv"
X
Xiaofan 已提交
26
	"strings"
27 28
	"sync"
	"sync/atomic"
29
	"syscall"
30 31
	"time"

32 33 34
	"github.com/milvus-io/milvus/internal/util"
	"github.com/milvus-io/milvus/internal/util/crypto"

G
godchen 已提交
35 36
	"github.com/milvus-io/milvus/internal/util/dependency"

37
	"github.com/golang/protobuf/proto"
G
godchen 已提交
38 39 40
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
41
	"github.com/milvus-io/milvus/internal/allocator"
42
	"github.com/milvus-io/milvus/internal/common"
43
	"github.com/milvus-io/milvus/internal/kv"
X
Xiangyu Wang 已提交
44 45
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
C
Cai Yudong 已提交
46
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
47
	ms "github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
48 49
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
50
	"github.com/milvus-io/milvus/internal/proto/etcdpb"
X
Xiangyu Wang 已提交
51 52 53 54 55
	"github.com/milvus-io/milvus/internal/proto/indexpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
56
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
57
	"github.com/milvus-io/milvus/internal/proto/schemapb"
X
Xiangyu Wang 已提交
58 59
	"github.com/milvus-io/milvus/internal/tso"
	"github.com/milvus-io/milvus/internal/types"
60
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
61
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
62
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
63
	"github.com/milvus-io/milvus/internal/util/sessionutil"
64
	"github.com/milvus-io/milvus/internal/util/timerecord"
G
godchen 已提交
65
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
66 67
	"github.com/milvus-io/milvus/internal/util/tsoutil"
	"github.com/milvus-io/milvus/internal/util/typeutil"
68 69
)

70 71 72
// UniqueID is an alias of typeutil.UniqueID.
type UniqueID = typeutil.UniqueID

73 74
// ------------------ struct -----------------------

75
// DdOperation used to save ddMsg into etcd
76
type DdOperation struct {
77
	Body []byte `json:"body"`
78
	Type string `json:"type"`
79 80
}

C
Cai Yudong 已提交
81 82 83 84 85 86 87 88
const (
	// MetricRequestsTotal used to count the num of total requests
	MetricRequestsTotal = "total"

	// MetricRequestsSuccess used to count the num of successful requests
	MetricRequestsSuccess = "success"
)

89
func metricProxy(v int64) string {
C
Cai Yudong 已提交
90 91 92
	return fmt.Sprintf("client_%d", v)
}

93
var Params paramtable.ComponentParam
94

C
Cai Yudong 已提交
95
// Core root coordinator core
96
type Core struct {
97
	MetaTable *MetaTable
98
	//id allocator
N
neza2017 已提交
99 100
	IDAllocator       func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error)
	IDAllocatorUpdate func() error
N
neza2017 已提交
101

102
	//tso allocator
103 104 105
	TSOAllocator        func(count uint32) (typeutil.Timestamp, error)
	TSOAllocatorUpdate  func() error
	TSOGetLastSavedTime func() time.Time
106 107

	//inner members
108 109 110 111 112 113
	ctx       context.Context
	cancel    context.CancelFunc
	wg        sync.WaitGroup
	etcdCli   *clientv3.Client
	kvBase    kv.TxnKV //*etcdkv.EtcdKV
	impTaskKv kv.MetaKv
114

115 116 117
	//DDL lock
	ddlLock sync.Mutex

118 119
	kvBaseCreate func(root string) (kv.TxnKV, error)

120 121
	metaKVCreate func(root string) (kv.MetaKv, error)

Z
zhenshan.cao 已提交
122
	//setMsgStreams, send time tick into dd channel and time tick channel
123
	SendTimeTick func(t typeutil.Timestamp, reason string) error
124

Z
zhenshan.cao 已提交
125
	//setMsgStreams, send create collection into dd channel
126 127
	//returns corresponding message id for each channel
	SendDdCreateCollectionReq func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error)
128

Z
zhenshan.cao 已提交
129
	//setMsgStreams, send drop collection into dd channel, and notify the proxy to delete this collection
130
	SendDdDropCollectionReq func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error
131

Z
zhenshan.cao 已提交
132
	//setMsgStreams, send create partition into dd channel
133
	SendDdCreatePartitionReq func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error
134

Z
zhenshan.cao 已提交
135
	//setMsgStreams, send drop partition into dd channel
136
	SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error
137

138
	//get binlog file path from data service,
G
godchen 已提交
139 140
	CallGetBinlogFilePathsService func(ctx context.Context, segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error)
	CallGetNumRowsService         func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (int64, error)
141
	CallGetFlushedSegmentsService func(ctx context.Context, collID, partID typeutil.UniqueID) ([]typeutil.UniqueID, error)
N
neza2017 已提交
142

143 144 145 146
	//call index builder's client to build index, return build id or get index state.
	CallBuildIndexService     func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (typeutil.UniqueID, error)
	CallDropIndexService      func(ctx context.Context, indexID typeutil.UniqueID) error
	CallGetIndexStatesService func(ctx context.Context, IndexBuildIDs []int64) ([]*indexpb.IndexInfo, error)
N
neza2017 已提交
147

148
	NewProxyClient func(sess *sessionutil.Session) (types.Proxy, error)
149

150
	//query service interface, notify query service to release collection
N
neza2017 已提交
151 152
	CallReleaseCollectionService func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID) error
	CallReleasePartitionService  func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) error
153

G
godchen 已提交
154 155
	CallWatchChannels func(ctx context.Context, collectionID int64, channelNames []string) error

156 157 158
	// Update segment state.
	CallUpdateSegmentStateService func(ctx context.Context, segID typeutil.UniqueID, ss commonpb.SegmentState) error

G
groot 已提交
159
	//assign import task to data service
160
	CallImportService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse
G
groot 已提交
161

162 163
	//Proxy manager
	proxyManager *proxyManager
N
neza2017 已提交
164 165 166 167

	// proxy clients
	proxyClientManager *proxyClientManager

168 169 170
	// metrics cache manager
	metricsCacheManager *metricsinfo.MetricsCacheManager

171 172 173
	// channel timetick
	chanTimeTick *timetickSync

174 175 176 177 178 179 180 181 182
	//time tick loop
	lastTimeTick typeutil.Timestamp

	//states code
	stateCode atomic.Value

	//call once
	initOnce  sync.Once
	startOnce sync.Once
183
	//isInit    atomic.Value
G
groot 已提交
184

185
	session *sessionutil.Session
G
godchen 已提交
186

G
godchen 已提交
187
	factory dependency.Factory
G
groot 已提交
188 189 190

	//import manager
	importManager *importManager
191 192 193 194
}

// --------------------- function --------------------------

195
// NewCore creates a new rootcoord core
G
godchen 已提交
196
func NewCore(c context.Context, factory dependency.Factory) (*Core, error) {
197 198 199
	ctx, cancel := context.WithCancel(c)
	rand.Seed(time.Now().UnixNano())
	core := &Core{
G
godchen 已提交
200 201 202 203
		ctx:     ctx,
		cancel:  cancel,
		ddlLock: sync.Mutex{},
		factory: factory,
204
	}
G
godchen 已提交
205
	core.UpdateStateCode(internalpb.StateCode_Abnormal)
206 207 208
	return core, nil
}

209
// UpdateStateCode update state code
G
godchen 已提交
210
func (c *Core) UpdateStateCode(code internalpb.StateCode) {
211 212 213
	c.stateCode.Store(code)
}

C
Cai Yudong 已提交
214
func (c *Core) checkHealthy() (internalpb.StateCode, bool) {
215
	code := c.stateCode.Load().(internalpb.StateCode)
C
Cai Yudong 已提交
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	ok := code == internalpb.StateCode_Healthy
	return code, ok
}

func failStatus(code commonpb.ErrorCode, reason string) *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: code,
		Reason:    reason,
	}
}

func succStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}
232 233
}

234 235
func (c *Core) checkInit() error {
	if c.MetaTable == nil {
236
		return fmt.Errorf("metaTable is nil")
237
	}
N
neza2017 已提交
238
	if c.IDAllocator == nil {
N
neza2017 已提交
239
		return fmt.Errorf("idAllocator is nil")
240
	}
N
neza2017 已提交
241
	if c.IDAllocatorUpdate == nil {
N
neza2017 已提交
242 243
		return fmt.Errorf("idAllocatorUpdate is nil")
	}
N
neza2017 已提交
244
	if c.TSOAllocator == nil {
N
neza2017 已提交
245
		return fmt.Errorf("tsoAllocator is nil")
246
	}
N
neza2017 已提交
247
	if c.TSOAllocatorUpdate == nil {
N
neza2017 已提交
248 249
		return fmt.Errorf("tsoAllocatorUpdate is nil")
	}
250
	if c.etcdCli == nil {
N
neza2017 已提交
251
		return fmt.Errorf("etcdCli is nil")
252 253
	}
	if c.kvBase == nil {
N
neza2017 已提交
254
		return fmt.Errorf("kvBase is nil")
255
	}
256 257 258
	if c.impTaskKv == nil {
		return fmt.Errorf("impTaskKv is nil")
	}
259
	if c.SendDdCreateCollectionReq == nil {
260
		return fmt.Errorf("sendDdCreateCollectionReq is nil")
261
	}
262
	if c.SendDdDropCollectionReq == nil {
263
		return fmt.Errorf("sendDdDropCollectionReq is nil")
264
	}
265
	if c.SendDdCreatePartitionReq == nil {
266
		return fmt.Errorf("sendDdCreatePartitionReq is nil")
267
	}
268
	if c.SendDdDropPartitionReq == nil {
269
		return fmt.Errorf("sendDdDropPartitionReq is nil")
270
	}
271
	if c.CallGetBinlogFilePathsService == nil {
272
		return fmt.Errorf("callGetBinlogFilePathsService is nil")
N
neza2017 已提交
273
	}
274
	if c.CallGetNumRowsService == nil {
275
		return fmt.Errorf("callGetNumRowsService is nil")
276
	}
277
	if c.CallBuildIndexService == nil {
278
		return fmt.Errorf("callBuildIndexService is nil")
N
neza2017 已提交
279
	}
280
	if c.CallDropIndexService == nil {
281
		return fmt.Errorf("callDropIndexService is nil")
N
neza2017 已提交
282
	}
283
	if c.CallGetFlushedSegmentsService == nil {
284
		return fmt.Errorf("callGetFlushedSegmentsService is nil")
285
	}
286 287 288
	if c.CallUpdateSegmentStateService == nil {
		return fmt.Errorf("CallUpdateSegmentStateService is nil")
	}
G
godchen 已提交
289
	if c.CallWatchChannels == nil {
290
		return fmt.Errorf("callWatchChannels is nil")
G
godchen 已提交
291
	}
N
neza2017 已提交
292
	if c.NewProxyClient == nil {
293
		return fmt.Errorf("newProxyClient is nil")
294 295
	}
	if c.CallReleaseCollectionService == nil {
296
		return fmt.Errorf("callReleaseCollectionService is nil")
297
	}
N
neza2017 已提交
298
	if c.CallReleasePartitionService == nil {
299
		return fmt.Errorf("callReleasePartitionService is nil")
N
neza2017 已提交
300
	}
G
groot 已提交
301 302 303
	if c.CallImportService == nil {
		return fmt.Errorf("callImportService is nil")
	}
N
neza2017 已提交
304

305 306 307 308
	return nil
}

func (c *Core) startTimeTickLoop() {
309
	defer c.wg.Done()
310
	ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval)
311 312 313
	for {
		select {
		case <-c.ctx.Done():
C
Cai Yudong 已提交
314
			log.Debug("rootcoord context closed", zap.Error(c.ctx.Err()))
315 316
			return
		case <-ticker.C:
317
			c.ddlLock.Lock()
N
neza2017 已提交
318
			if ts, err := c.TSOAllocator(1); err == nil {
X
Xiaofan 已提交
319 320 321 322
				err := c.SendTimeTick(ts, "timetick loop")
				if err != nil {
					log.Warn("Failed to send timetick", zap.Error(err))
				}
323
			}
324
			c.ddlLock.Unlock()
325 326 327 328
		}
	}
}

329
func (c *Core) tsLoop() {
330
	defer c.wg.Done()
331
	tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
332 333 334 335 336 337
	defer tsoTicker.Stop()
	ctx, cancel := context.WithCancel(c.ctx)
	defer cancel()
	for {
		select {
		case <-tsoTicker.C:
N
neza2017 已提交
338
			if err := c.TSOAllocatorUpdate(); err != nil {
N
neza2017 已提交
339 340
				log.Warn("failed to update timestamp: ", zap.Error(err))
				continue
341
			}
342 343
			ts := c.TSOGetLastSavedTime()
			metrics.RootCoordETCDTimestampAllocCounter.Set(float64(ts.Unix()))
N
neza2017 已提交
344
			if err := c.IDAllocatorUpdate(); err != nil {
N
neza2017 已提交
345 346
				log.Warn("failed to update id: ", zap.Error(err))
				continue
347 348 349
			}
		case <-ctx.Done():
			// Server is closed and it should return nil.
B
bigsheeper 已提交
350
			log.Debug("tsLoop is closed")
351 352 353 354
			return
		}
	}
}
355

356
func (c *Core) checkFlushedSegmentsLoop() {
357
	defer c.wg.Done()
358 359 360 361
	ticker := time.NewTicker(10 * time.Minute)
	for {
		select {
		case <-c.ctx.Done():
362
			log.Debug("RootCoord context done, exit check FlushedSegmentsLoop")
363 364 365
			return
		case <-ticker.C:
			log.Debug("check flushed segments")
C
congqixia 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
			c.checkFlushedSegments(c.ctx)
		}
	}
}

func (c *Core) checkFlushedSegments(ctx context.Context) {
	collID2Meta, segID2IndexMeta, indexID2Meta := c.MetaTable.dupMeta()
	for _, collMeta := range collID2Meta {
		if len(collMeta.FieldIndexes) == 0 {
			continue
		}
		for _, partID := range collMeta.PartitionIDs {
			ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Minute)
			segIDs, err := c.CallGetFlushedSegmentsService(ctx2, collMeta.ID, partID)
			if err != nil {
381
				log.Debug("failed to get flushed segments from data coord",
C
congqixia 已提交
382 383 384
					zap.Int64("collection id", collMeta.ID),
					zap.Int64("partition id", partID),
					zap.Error(err))
385 386 387 388 389 390 391 392 393 394 395 396
				cancel2()
				continue
			}
			for _, segID := range segIDs {
				indexInfos := []*etcdpb.FieldIndexInfo{}
				indexMeta, ok := segID2IndexMeta[segID]
				if !ok {
					indexInfos = append(indexInfos, collMeta.FieldIndexes...)
				} else {
					for _, idx := range collMeta.FieldIndexes {
						if _, ok := indexMeta[idx.IndexID]; !ok {
							indexInfos = append(indexInfos, idx)
397 398
						}
					}
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
				}
				for _, idxInfo := range indexInfos {
					/* #nosec G601 */
					field, err := GetFieldSchemaByID(&collMeta, idxInfo.FiledID)
					if err != nil {
						log.Debug("GetFieldSchemaByID",
							zap.Any("collection_meta", collMeta),
							zap.Int64("field id", idxInfo.FiledID))
						continue
					}
					indexMeta, ok := indexID2Meta[idxInfo.IndexID]
					if !ok {
						log.Debug("index meta does not exist", zap.Int64("index_id", idxInfo.IndexID))
						continue
					}
					info := etcdpb.SegmentIndexInfo{
						CollectionID: collMeta.ID,
						PartitionID:  partID,
						SegmentID:    segID,
						FieldID:      idxInfo.FiledID,
						IndexID:      idxInfo.IndexID,
						EnableIndex:  false,
					}
					log.Debug("building index by background checker",
						zap.Int64("segment_id", segID),
						zap.Int64("index_id", indexMeta.IndexID),
						zap.Int64("collection_id", collMeta.ID))
					info.BuildID, err = c.BuildIndex(ctx2, segID, field, &indexMeta, false)
					if err != nil {
						log.Debug("build index failed",
C
congqixia 已提交
429
							zap.Int64("segment_id", segID),
430 431 432 433 434 435 436 437 438 439 440 441 442
							zap.Int64("field_id", field.FieldID),
							zap.Int64("index_id", indexMeta.IndexID))
						continue
					}
					if info.BuildID != 0 {
						info.EnableIndex = true
					}
					if err := c.MetaTable.AddIndex(&info); err != nil {
						log.Debug("Add index into meta table failed",
							zap.Int64("collection_id", collMeta.ID),
							zap.Int64("index_id", info.IndexID),
							zap.Int64("build_id", info.BuildID),
							zap.Error(err))
C
congqixia 已提交
443
					}
444 445
				}
			}
C
congqixia 已提交
446
			cancel2()
447 448
		}
	}
N
neza2017 已提交
449 450
}

451 452 453 454 455 456 457 458 459 460 461 462
func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[typeutil.UniqueID]typeutil.UniqueID, error) {
	collMeta, err := c.MetaTable.GetCollectionByID(collID, 0)
	if err != nil {
		return nil, err
	}
	segID2PartID := map[typeutil.UniqueID]typeutil.UniqueID{}
	for _, partID := range collMeta.PartitionIDs {
		if seg, err := c.CallGetFlushedSegmentsService(ctx, collID, partID); err == nil {
			for _, s := range seg {
				segID2PartID[s] = partID
			}
		} else {
463
			log.Debug("failed to get flushed segments from data coord", zap.Int64("collection_id", collID), zap.Int64("partition_id", partID), zap.Error(err))
464 465 466 467
			return nil, err
		}
	}

468
	metrics.RootCoordNumOfSegments.WithLabelValues(strconv.FormatInt(collID, 10)).Set(float64(len(segID2PartID)))
469 470 471
	return segID2PartID, nil
}

Z
zhenshan.cao 已提交
472
func (c *Core) setMsgStreams() error {
473
	if Params.CommonCfg.RootCoordSubName == "" {
474
		return fmt.Errorf("RootCoordSubName is empty")
N
neza2017 已提交
475 476
	}

477
	c.SendTimeTick = func(t typeutil.Timestamp, reason string) error {
478
		pc := c.chanTimeTick.listDmlChannels()
479 480 481 482 483 484 485 486 487 488 489
		pt := make([]uint64, len(pc))
		for i := 0; i < len(pt); i++ {
			pt[i] = t
		}
		ttMsg := internalpb.ChannelTimeTickMsg{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_TimeTick,
				MsgID:     0, //TODO
				Timestamp: t,
				SourceID:  c.session.ServerID,
			},
N
neza2017 已提交
490 491 492
			ChannelNames:     pc,
			Timestamps:       pt,
			DefaultTimestamp: t,
493
		}
494
		return c.chanTimeTick.updateTimeTick(&ttMsg, reason)
Z
zhenshan.cao 已提交
495 496
	}

497
	c.SendDdCreateCollectionReq = func(ctx context.Context, req *internalpb.CreateCollectionRequest, channelNames []string) (map[string][]byte, error) {
Z
zhenshan.cao 已提交
498 499
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
500
			Ctx:            ctx,
Z
zhenshan.cao 已提交
501 502 503 504
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
505
		msg := &ms.CreateCollectionMsg{
Z
zhenshan.cao 已提交
506 507 508
			BaseMsg:                 baseMsg,
			CreateCollectionRequest: *req,
		}
509
		msgPack.Msgs = append(msgPack.Msgs, msg)
510
		return c.chanTimeTick.broadcastMarkDmlChannels(channelNames, &msgPack)
Z
zhenshan.cao 已提交
511 512
	}

513
	c.SendDdDropCollectionReq = func(ctx context.Context, req *internalpb.DropCollectionRequest, channelNames []string) error {
Z
zhenshan.cao 已提交
514 515
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
516
			Ctx:            ctx,
Z
zhenshan.cao 已提交
517 518 519 520
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
521
		msg := &ms.DropCollectionMsg{
Z
zhenshan.cao 已提交
522 523 524
			BaseMsg:               baseMsg,
			DropCollectionRequest: *req,
		}
525
		msgPack.Msgs = append(msgPack.Msgs, msg)
526
		return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
Z
zhenshan.cao 已提交
527 528
	}

529
	c.SendDdCreatePartitionReq = func(ctx context.Context, req *internalpb.CreatePartitionRequest, channelNames []string) error {
Z
zhenshan.cao 已提交
530 531
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
532
			Ctx:            ctx,
Z
zhenshan.cao 已提交
533 534 535 536
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
537
		msg := &ms.CreatePartitionMsg{
Z
zhenshan.cao 已提交
538 539 540
			BaseMsg:                baseMsg,
			CreatePartitionRequest: *req,
		}
541
		msgPack.Msgs = append(msgPack.Msgs, msg)
542
		return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
Z
zhenshan.cao 已提交
543 544
	}

545
	c.SendDdDropPartitionReq = func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error {
Z
zhenshan.cao 已提交
546 547
		msgPack := ms.MsgPack{}
		baseMsg := ms.BaseMsg{
548
			Ctx:            ctx,
Z
zhenshan.cao 已提交
549 550 551 552
			BeginTimestamp: req.Base.Timestamp,
			EndTimestamp:   req.Base.Timestamp,
			HashValues:     []uint32{0},
		}
553
		msg := &ms.DropPartitionMsg{
Z
zhenshan.cao 已提交
554 555 556
			BaseMsg:              baseMsg,
			DropPartitionRequest: *req,
		}
557
		msgPack.Msgs = append(msgPack.Msgs, msg)
558
		return c.chanTimeTick.broadcastDmlChannels(channelNames, &msgPack)
Z
zhenshan.cao 已提交
559 560 561 562 563
	}

	return nil
}

564
// SetNewProxyClient set client to create proxy
565
func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.Proxy, error)) {
N
neza2017 已提交
566 567 568
	if c.NewProxyClient == nil {
		c.NewProxyClient = f
	} else {
C
Cai Yudong 已提交
569
		log.Debug("NewProxyClient has already set")
N
neza2017 已提交
570 571 572
	}
}

573
// SetDataCoord set dataCoord.
574
func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error {
575 576 577 578 579 580
	initCh := make(chan struct{})
	go func() {
		for {
			if err := s.Init(); err == nil {
				if err := s.Start(); err == nil {
					close(initCh)
581
					log.Debug("RootCoord connected to DataCoord")
582 583 584
					return
				}
			}
585
			log.Debug("Retrying RootCoord connection to DataCoord")
586 587
		}
	}()
G
godchen 已提交
588
	c.CallGetBinlogFilePathsService = func(ctx context.Context, segID typeutil.UniqueID, fieldID typeutil.UniqueID) (retFiles []string, retErr error) {
N
neza2017 已提交
589 590 591 592 593
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("get bin log file paths panic, msg = %v", err)
			}
		}()
594
		<-initCh //wait connect to DataCoord
N
neza2017 已提交
595
		ts, err := c.TSOAllocator(1)
N
neza2017 已提交
596
		if err != nil {
C
Cai Yudong 已提交
597
			return nil, err
N
neza2017 已提交
598
		}
G
godchen 已提交
599
		binlog, err := s.GetInsertBinlogPaths(ctx, &datapb.GetInsertBinlogPathsRequest{
N
neza2017 已提交
600
			Base: &commonpb.MsgBase{
601
				MsgType:   0, //TODO, msg type
N
neza2017 已提交
602 603
				MsgID:     0,
				Timestamp: ts,
604
				SourceID:  c.session.ServerID,
N
neza2017 已提交
605 606 607 608
			},
			SegmentID: segID,
		})
		if err != nil {
C
Cai Yudong 已提交
609
			return nil, err
N
neza2017 已提交
610
		}
611
		if binlog.Status.ErrorCode != commonpb.ErrorCode_Success {
612
			return nil, fmt.Errorf("getInsertBinlogPaths from data service failed, error = %s", binlog.Status.Reason)
N
neza2017 已提交
613
		}
614
		binlogPaths := make([]string, 0)
N
neza2017 已提交
615 616
		for i := range binlog.FieldIDs {
			if binlog.FieldIDs[i] == fieldID {
617
				binlogPaths = append(binlogPaths, binlog.Paths[i].Values...)
N
neza2017 已提交
618 619
			}
		}
620 621 622 623
		if len(binlogPaths) == 0 {
			return nil, fmt.Errorf("binlog file does not exist, segment id = %d, field id = %d", segID, fieldID)
		}
		return binlogPaths, nil
N
neza2017 已提交
624
	}
625

G
godchen 已提交
626
	c.CallGetNumRowsService = func(ctx context.Context, segID typeutil.UniqueID, isFromFlushedChan bool) (retRows int64, retErr error) {
N
neza2017 已提交
627 628 629 630 631
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("get num rows panic, msg = %v", err)
			}
		}()
632
		<-initCh
N
neza2017 已提交
633
		ts, err := c.TSOAllocator(1)
634
		if err != nil {
C
Cai Yudong 已提交
635
			return retRows, err
636
		}
G
godchen 已提交
637
		segInfo, err := s.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
638 639 640 641
			Base: &commonpb.MsgBase{
				MsgType:   0, //TODO, msg type
				MsgID:     0,
				Timestamp: ts,
642
				SourceID:  c.session.ServerID,
643 644 645 646
			},
			SegmentIDs: []typeutil.UniqueID{segID},
		})
		if err != nil {
C
Cai Yudong 已提交
647
			return retRows, err
648
		}
649
		if segInfo.Status.ErrorCode != commonpb.ErrorCode_Success {
650
			return retRows, fmt.Errorf("getSegmentInfo from data service failed, error = %s", segInfo.Status.Reason)
651 652 653
		}
		if len(segInfo.Infos) != 1 {
			log.Debug("get segment info empty")
C
Cai Yudong 已提交
654
			return retRows, nil
655
		}
N
neza2017 已提交
656
		if !isFromFlushedChan && segInfo.Infos[0].State != commonpb.SegmentState_Flushed {
657
			log.Debug("segment id not flushed", zap.Int64("segment id", segID))
C
Cai Yudong 已提交
658
			return retRows, nil
659
		}
C
Cai Yudong 已提交
660
		return segInfo.Infos[0].NumOfRows, nil
661
	}
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

	c.CallGetFlushedSegmentsService = func(ctx context.Context, collID, partID typeutil.UniqueID) (retSegIDs []typeutil.UniqueID, retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("get flushed segments from data coord panic, msg = %v", err)
			}
		}()
		<-initCh
		req := &datapb.GetFlushedSegmentsRequest{
			Base: &commonpb.MsgBase{
				MsgType:   0, //TODO,msg type
				MsgID:     0,
				Timestamp: 0,
				SourceID:  c.session.ServerID,
			},
			CollectionID: collID,
			PartitionID:  partID,
		}
		rsp, err := s.GetFlushedSegments(ctx, req)
		if err != nil {
682
			return nil, err
683 684
		}
		if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
685
			return nil, fmt.Errorf("get flushed segments from data coord failed, reason = %s", rsp.Status.Reason)
686
		}
C
Cai Yudong 已提交
687
		return rsp.Segments, nil
688 689
	}

690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
	c.CallUpdateSegmentStateService = func(ctx context.Context, segID typeutil.UniqueID, ss commonpb.SegmentState) (retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("update segment state from data coord panic, msg = %v", err)
			}
		}()
		<-initCh
		req := &datapb.SetSegmentStateRequest{
			SegmentId: segID,
			NewState:  ss,
		}
		resp, err := s.SetSegmentState(ctx, req)
		if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
			log.Error("failed to update segment state",
				zap.Any("request", req), zap.Any("response", resp), zap.Error(err))
			return err
		}
		log.Info("successfully set segment state",
			zap.Int64("segment ID", req.GetSegmentId()),
			zap.String("new segment state", req.GetNewState().String()))
		return nil
	}

G
godchen 已提交
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
	c.CallWatchChannels = func(ctx context.Context, collectionID int64, channelNames []string) (retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("watch channels panic, msg = %v", err)
			}
		}()
		<-initCh
		req := &datapb.WatchChannelsRequest{
			CollectionID: collectionID,
			ChannelNames: channelNames,
		}
		rsp, err := s.WatchChannels(ctx, req)
		if err != nil {
			return err
		}
		if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
			return fmt.Errorf("data coord watch channels failed, reason = %s", rsp.Status.Reason)
		}
		return nil
	}
733
	c.CallImportService = func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse {
G
groot 已提交
734 735 736 737
		resp := &datapb.ImportTaskResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
G
groot 已提交
738
		}
G
groot 已提交
739

G
groot 已提交
740 741
		defer func() {
			if err := recover(); err != nil {
G
groot 已提交
742 743
				resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
				resp.Status.Reason = "assign import task to data coord panic"
G
groot 已提交
744 745 746
			}
		}()

G
groot 已提交
747 748 749
		resp, _ = s.Import(ctx, req)
		if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
			return resp
G
groot 已提交
750 751
		}

G
groot 已提交
752
		return resp
G
groot 已提交
753 754
	}

N
neza2017 已提交
755 756 757
	return nil
}

758
// SetIndexCoord set indexcoord
759
func (c *Core) SetIndexCoord(s types.IndexCoord) error {
760 761 762 763 764 765
	initCh := make(chan struct{})
	go func() {
		for {
			if err := s.Init(); err == nil {
				if err := s.Start(); err == nil {
					close(initCh)
766
					log.Debug("RootCoord connected to IndexCoord")
767 768 769
					return
				}
			}
770
			log.Debug("Retrying RootCoord connection to IndexCoord")
771 772 773
		}
	}()

774
	c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, numRows int64) (retID typeutil.UniqueID, retErr error) {
N
neza2017 已提交
775 776 777 778 779
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("build index panic, msg = %v", err)
			}
		}()
780
		<-initCh
G
godchen 已提交
781
		rsp, err := s.BuildIndex(ctx, &indexpb.BuildIndexRequest{
N
neza2017 已提交
782
			DataPaths:   binlog,
783 784 785 786
			TypeParams:  field.TypeParams,
			IndexParams: idxInfo.IndexParams,
			IndexID:     idxInfo.IndexID,
			IndexName:   idxInfo.IndexName,
787 788
			NumRows:     numRows,
			FieldSchema: field,
N
neza2017 已提交
789 790
		})
		if err != nil {
C
Cai Yudong 已提交
791
			return retID, err
N
neza2017 已提交
792
		}
793
		if rsp.Status.ErrorCode != commonpb.ErrorCode_Success {
794
			return retID, fmt.Errorf("buildIndex from index service failed, error = %s", rsp.Status.Reason)
N
neza2017 已提交
795
		}
C
Cai Yudong 已提交
796
		return rsp.IndexBuildID, nil
N
neza2017 已提交
797
	}
N
neza2017 已提交
798

N
neza2017 已提交
799 800 801 802 803 804
	c.CallDropIndexService = func(ctx context.Context, indexID typeutil.UniqueID) (retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("drop index from index service panic, msg = %v", err)
			}
		}()
805
		<-initCh
G
godchen 已提交
806
		rsp, err := s.DropIndex(ctx, &indexpb.DropIndexRequest{
N
neza2017 已提交
807 808 809
			IndexID: indexID,
		})
		if err != nil {
C
Cai Yudong 已提交
810
			return err
N
neza2017 已提交
811
		}
812
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
C
Cai Yudong 已提交
813
			return fmt.Errorf(rsp.Reason)
N
neza2017 已提交
814
		}
C
Cai Yudong 已提交
815
		return nil
N
neza2017 已提交
816 817
	}

818 819 820 821 822 823 824 825 826 827 828 829 830 831
	c.CallGetIndexStatesService = func(ctx context.Context, IndexBuildIDs []int64) (idxInfo []*indexpb.IndexInfo, retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("get index state from index service panic, msg = %v", err)
			}
		}()
		<-initCh
		res, err := s.GetIndexStates(ctx, &indexpb.GetIndexStatesRequest{
			IndexBuildIDs: IndexBuildIDs,
		})
		if err != nil {
			log.Error("RootCoord failed to get index states from IndexCoord.", zap.Error(err))
			return nil, err
		}
832
		log.Debug("got index states", zap.String("get index state result", res.String()))
833 834 835 836 837 838 839 840
		if res.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
			log.Error("Get index states failed.",
				zap.String("error_code", res.GetStatus().GetErrorCode().String()),
				zap.String("reason", res.GetStatus().GetReason()))
			return nil, fmt.Errorf(res.GetStatus().GetErrorCode().String())
		}
		return res.GetStates(), nil
	}
N
neza2017 已提交
841 842 843
	return nil
}

844
// SetQueryCoord set querycoord
845
func (c *Core) SetQueryCoord(s types.QueryCoord) error {
846 847 848 849 850 851
	initCh := make(chan struct{})
	go func() {
		for {
			if err := s.Init(); err == nil {
				if err := s.Start(); err == nil {
					close(initCh)
852
					log.Debug("RootCoord connected to QueryCoord")
853 854 855
					return
				}
			}
856
			log.Debug("Retrying RootCoord connection to QueryCoord")
857 858
		}
	}()
N
neza2017 已提交
859 860 861 862 863 864
	c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) (retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("release collection from query service panic, msg = %v", err)
			}
		}()
865
		<-initCh
866 867
		req := &querypb.ReleaseCollectionRequest{
			Base: &commonpb.MsgBase{
868
				MsgType:   commonpb.MsgType_ReleaseCollection,
869 870
				MsgID:     0, //TODO, msg ID
				Timestamp: ts,
871
				SourceID:  c.session.ServerID,
872 873 874 875
			},
			DbID:         dbID,
			CollectionID: collectionID,
		}
G
godchen 已提交
876
		rsp, err := s.ReleaseCollection(ctx, req)
877
		if err != nil {
C
Cai Yudong 已提交
878
			return err
879
		}
880
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
881
			return fmt.Errorf("releaseCollection from query service failed, error = %s", rsp.Reason)
882
		}
C
Cai Yudong 已提交
883
		return nil
884
	}
N
neza2017 已提交
885 886 887 888 889 890
	c.CallReleasePartitionService = func(ctx context.Context, ts typeutil.Timestamp, dbID, collectionID typeutil.UniqueID, partitionIDs []typeutil.UniqueID) (retErr error) {
		defer func() {
			if err := recover(); err != nil {
				retErr = fmt.Errorf("release partition from query service panic, msg = %v", err)
			}
		}()
891
		<-initCh
N
neza2017 已提交
892 893 894 895 896 897 898 899 900 901 902 903 904
		req := &querypb.ReleasePartitionsRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_ReleasePartitions,
				MsgID:     0, //TODO, msg ID
				Timestamp: ts,
				SourceID:  c.session.ServerID,
			},
			DbID:         dbID,
			CollectionID: collectionID,
			PartitionIDs: partitionIDs,
		}
		rsp, err := s.ReleasePartitions(ctx, req)
		if err != nil {
C
Cai Yudong 已提交
905
			return err
N
neza2017 已提交
906 907
		}
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
908
			return fmt.Errorf("releasePartitions from query service failed, error = %s", rsp.Reason)
N
neza2017 已提交
909
		}
C
Cai Yudong 已提交
910
		return nil
N
neza2017 已提交
911
	}
912 913 914
	return nil
}

915
// BuildIndex will check row num and call build index service
G
godchen 已提交
916
func (c *Core) BuildIndex(ctx context.Context, segID typeutil.UniqueID, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo, isFlush bool) (typeutil.UniqueID, error) {
J
jaime 已提交
917 918
	log.Debug("start build index", zap.String("index name", idxInfo.IndexName),
		zap.String("field name", field.Name), zap.Int64("segment id", segID))
G
godchen 已提交
919 920
	sp, ctx := trace.StartSpanFromContext(ctx)
	defer sp.Finish()
921
	if c.MetaTable.IsSegmentIndexed(segID, field, idxInfo.IndexParams) {
922
		return 0, nil
923
	}
G
godchen 已提交
924
	rows, err := c.CallGetNumRowsService(ctx, segID, isFlush)
925
	if err != nil {
926
		return 0, err
927 928
	}
	var bldID typeutil.UniqueID
929
	if rows < Params.RootCoordCfg.MinSegmentSizeToEnableIndex {
930 931
		log.Debug("num of rows is less than MinSegmentSizeToEnableIndex", zap.Int64("num rows", rows))
	} else {
G
godchen 已提交
932
		binlogs, err := c.CallGetBinlogFilePathsService(ctx, segID, field.FieldID)
933
		if err != nil {
934
			return 0, err
935
		}
936
		bldID, err = c.CallBuildIndexService(ctx, binlogs, field, idxInfo, rows)
937
		if err != nil {
938
			return 0, err
939
		}
J
jaime 已提交
940 941 942

		log.Debug("CallBuildIndex finished", zap.String("index name", idxInfo.IndexName),
			zap.String("field name", field.Name), zap.Int64("segment id", segID), zap.Int64("num rows", rows))
943
	}
944
	return bldID, nil
945 946
}

947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
// RemoveIndex will call drop index service
func (c *Core) RemoveIndex(ctx context.Context, collName string, indexName string) error {
	_, indexInfos, err := c.MetaTable.GetIndexByName(collName, indexName)
	if err != nil {
		log.Error("GetIndexByName failed,", zap.String("collection name", collName),
			zap.String("index name", indexName), zap.Error(err))
		return err
	}
	for _, indexInfo := range indexInfos {
		if err = c.CallDropIndexService(ctx, indexInfo.IndexID); err != nil {
			log.Error("CallDropIndexService failed,", zap.String("collection name", collName), zap.Error(err))
			return err
		}
	}
	return nil
}

// ExpireMetaCache will call invalidate collection meta cache
func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeutil.Timestamp) {
	for _, collName := range collNames {
		req := proxypb.InvalidateCollMetaCacheRequest{
			Base: &commonpb.MsgBase{
				MsgType:   0, //TODO, msg type
				MsgID:     0, //TODO, msg id
				Timestamp: ts,
				SourceID:  c.session.ServerID,
			},
			CollectionName: collName,
		}
		c.proxyClientManager.InvalidateCollectionMetaCache(ctx, &req)
	}
}

C
Cai Yudong 已提交
980
// Register register rootcoord at etcd
981
func (c *Core) Register() error {
982 983 984 985 986 987 988
	c.session.Register()
	go c.session.LivenessCheck(c.ctx, func() {
		log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
		if err := c.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
989
		if c.session.TriggerKill {
J
Ji Bin 已提交
990 991 992
			if p, err := os.FindProcess(os.Getpid()); err == nil {
				p.Signal(syscall.SIGINT)
			}
X
Xiaofan 已提交
993
		}
994
	})
995 996 997

	c.UpdateStateCode(internalpb.StateCode_Healthy)
	log.Debug("RootCoord start successfully ", zap.String("State Code", internalpb.StateCode_Healthy.String()))
998 999 1000
	return nil
}

1001
// SetEtcdClient sets the etcdCli of Core
X
Xiaofan 已提交
1002 1003 1004 1005
func (c *Core) SetEtcdClient(etcdClient *clientv3.Client) {
	c.etcdCli = etcdClient
}

1006
func (c *Core) initSession() error {
1007
	c.session = sessionutil.NewSession(c.ctx, Params.EtcdCfg.MetaRootPath, c.etcdCli)
G
godchen 已提交
1008
	if c.session == nil {
1009
		return fmt.Errorf("session is nil, the etcd client connection may have failed")
G
godchen 已提交
1010
	}
X
Xiaofan 已提交
1011
	c.session.Init(typeutil.RootCoordRole, Params.RootCoordCfg.Address, true, true)
1012
	Params.SetLogger(c.session.ServerID)
1013 1014 1015
	return nil
}

1016
// Init initialize routine
N
neza2017 已提交
1017
func (c *Core) Init() error {
1018
	var initError error
1019 1020
	if c.kvBaseCreate == nil {
		c.kvBaseCreate = func(root string) (kv.TxnKV, error) {
X
Xiaofan 已提交
1021
			return etcdkv.NewEtcdKV(c.etcdCli, root), nil
1022 1023
		}
	}
1024 1025 1026 1027 1028
	if c.metaKVCreate == nil {
		c.metaKVCreate = func(root string) (kv.MetaKv, error) {
			return etcdkv.NewEtcdKV(c.etcdCli, root), nil
		}
	}
1029
	c.initOnce.Do(func() {
1030 1031 1032 1033 1034
		if err := c.initSession(); err != nil {
			initError = err
			log.Error("RootCoord init session failed", zap.Error(err))
			return
		}
1035
		connectEtcdFn := func() error {
1036
			if c.kvBase, initError = c.kvBaseCreate(Params.EtcdCfg.KvRootPath); initError != nil {
1037 1038 1039 1040 1041
				log.Error("RootCoord failed to new EtcdKV for kvBase", zap.Any("reason", initError))
				return initError
			}
			if c.impTaskKv, initError = c.metaKVCreate(Params.EtcdCfg.KvRootPath); initError != nil {
				log.Error("RootCoord failed to new EtcdKV for MetaKV", zap.Any("reason", initError))
1042 1043 1044
				return initError
			}
			var metaKV kv.TxnKV
1045
			metaKV, initError = c.kvBaseCreate(Params.EtcdCfg.MetaRootPath)
1046
			if initError != nil {
C
Cai Yudong 已提交
1047
				log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError))
1048 1049
				return initError
			}
1050
			var ss *suffixSnapshot
1051
			if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.EtcdCfg.MetaRootPath, "snapshots"); initError != nil {
C
Cai Yudong 已提交
1052
				log.Error("RootCoord failed to new suffixSnapshot", zap.Error(initError))
1053 1054
				return initError
			}
1055
			if c.MetaTable, initError = NewMetaTable(metaKV, ss); initError != nil {
C
Cai Yudong 已提交
1056
				log.Error("RootCoord failed to new MetaTable", zap.Any("reason", initError))
X
XuanYang-cn 已提交
1057 1058 1059
				return initError
			}

1060
			return nil
1061
		}
1062
		log.Debug("RootCoord, Connecting to Etcd", zap.String("kv root", Params.EtcdCfg.KvRootPath), zap.String("meta root", Params.EtcdCfg.MetaRootPath))
G
godchen 已提交
1063
		err := retry.Do(c.ctx, connectEtcdFn, retry.Attempts(300))
1064
		if err != nil {
1065 1066 1067
			return
		}

1068
		log.Debug("RootCoord, Setting TSO and ID Allocator")
1069
		kv := tsoutil.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath, "gid")
X
XuanYang-cn 已提交
1070
		idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", kv)
N
neza2017 已提交
1071
		if initError = idAllocator.Initialize(); initError != nil {
1072 1073
			return
		}
N
neza2017 已提交
1074
		c.IDAllocator = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) {
N
neza2017 已提交
1075 1076
			return idAllocator.Alloc(count)
		}
N
neza2017 已提交
1077
		c.IDAllocatorUpdate = func() error {
N
neza2017 已提交
1078 1079 1080
			return idAllocator.UpdateID()
		}

1081
		kv = tsoutil.NewTSOKVBase(c.etcdCli, Params.EtcdCfg.KvRootPath, "tso")
X
XuanYang-cn 已提交
1082
		tsoAllocator := tso.NewGlobalTSOAllocator("timestamp", kv)
N
neza2017 已提交
1083
		if initError = tsoAllocator.Initialize(); initError != nil {
1084 1085
			return
		}
N
neza2017 已提交
1086
		c.TSOAllocator = func(count uint32) (typeutil.Timestamp, error) {
N
neza2017 已提交
1087 1088
			return tsoAllocator.Alloc(count)
		}
N
neza2017 已提交
1089
		c.TSOAllocatorUpdate = func() error {
N
neza2017 已提交
1090 1091
			return tsoAllocator.UpdateTSO()
		}
1092 1093 1094
		c.TSOGetLastSavedTime = func() time.Time {
			return tsoAllocator.GetLastSavedTime()
		}
N
neza2017 已提交
1095

G
godchen 已提交
1096
		c.factory.Init(&Params)
1097

1098
		chanMap := c.MetaTable.ListCollectionPhysicalChannels()
G
godchen 已提交
1099
		c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap)
1100
		c.chanTimeTick.addSession(c.session)
N
neza2017 已提交
1101 1102
		c.proxyClientManager = newProxyClientManager(c)

N
neza2017 已提交
1103
		log.Debug("RootCoord, set proxy manager")
X
Xiaofan 已提交
1104
		c.proxyManager = newProxyManager(
N
neza2017 已提交
1105
			c.ctx,
X
Xiaofan 已提交
1106
			c.etcdCli,
X
Xiaofan 已提交
1107
			c.chanTimeTick.initSessions,
N
neza2017 已提交
1108 1109
			c.proxyClientManager.GetProxyClients,
		)
X
Xiaofan 已提交
1110 1111
		c.proxyManager.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
		c.proxyManager.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
1112

1113 1114
		c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()

Z
zhenshan.cao 已提交
1115
		initError = c.setMsgStreams()
X
XuanYang-cn 已提交
1116 1117 1118
		if initError != nil {
			return
		}
G
groot 已提交
1119 1120 1121

		c.importManager = newImportManager(
			c.ctx,
1122
			c.impTaskKv,
G
groot 已提交
1123 1124
			c.CallImportService,
		)
1125 1126 1127 1128 1129 1130 1131
		// init data
		encryptedRootPassword, _ := crypto.PasswordEncrypt(util.DefaultRootPassword)
		initError = c.MetaTable.AddCredential(&internalpb.CredentialInfo{Username: util.UserRoot, EncryptedPassword: encryptedRootPassword})
		if initError != nil {
			return
		}
		log.Debug("RootCoord init user root done")
1132
	})
C
Cai Yudong 已提交
1133
	if initError != nil {
N
neza2017 已提交
1134
		log.Debug("RootCoord init error", zap.Error(initError))
N
neza2017 已提交
1135
	}
C
Cai Yudong 已提交
1136
	log.Debug("RootCoord init done")
1137 1138 1139
	return initError
}

C
Cai Yudong 已提交
1140 1141
func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
	if !force {
1142
		flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix)
X
Xiaofan 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
		if err != nil {
			// TODO, this is super ugly hack but our kv interface does not support loadWithExist
			// leave it for later
			if strings.Contains(err.Error(), "there is no value on key") {
				log.Debug("skip reSendDdMsg with no dd-msg-send key")
				return nil
			}
			return err
		}
		value, err := strconv.ParseBool(flag)
		if err != nil {
			return err
		}
		if value {
			log.Debug("skip reSendDdMsg with dd-msg-send set to true")
C
Cai Yudong 已提交
1158 1159
			return nil
		}
1160 1161
	}

1162
	ddOpStr, err := c.MetaTable.txn.Load(DDOperationPrefix)
1163 1164 1165 1166 1167 1168 1169 1170 1171
	if err != nil {
		log.Debug("DdOperation key does not exist")
		return nil
	}
	var ddOp DdOperation
	if err = json.Unmarshal([]byte(ddOpStr), &ddOp); err != nil {
		return err
	}

C
Cai Yudong 已提交
1172
	invalidateCache := false
C
Cai Yudong 已提交
1173
	var ts typeutil.Timestamp
1174
	var collName string
C
Cai Yudong 已提交
1175

1176
	switch ddOp.Type {
1177 1178
	// TODO remove create collection resend
	// since create collection needs a start position to succeed
1179
	case CreateCollectionDDType:
1180
		var ddReq = internalpb.CreateCollectionRequest{}
1181
		if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
1182 1183
			return err
		}
C
Cai Yudong 已提交
1184 1185 1186 1187 1188 1189 1190
		if _, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err != nil {
			if _, err = c.SendDdCreateCollectionReq(ctx, &ddReq, ddReq.PhysicalChannelNames); err != nil {
				return err
			}
		} else {
			log.Debug("collection has been created, skip re-send CreateCollection",
				zap.String("collection name", collName))
1191 1192 1193
		}
	case DropCollectionDDType:
		var ddReq = internalpb.DropCollectionRequest{}
1194
		if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
1195 1196
			return err
		}
C
Cai Yudong 已提交
1197
		ts = ddReq.Base.Timestamp
1198
		collName = ddReq.CollectionName
C
Cai Yudong 已提交
1199 1200 1201 1202 1203 1204 1205 1206
		if collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0); err == nil {
			if err = c.SendDdDropCollectionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
				return err
			}
			invalidateCache = true
		} else {
			log.Debug("collection has been removed, skip re-send DropCollection",
				zap.String("collection name", collName))
1207 1208 1209
		}
	case CreatePartitionDDType:
		var ddReq = internalpb.CreatePartitionRequest{}
1210
		if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
1211 1212
			return err
		}
C
Cai Yudong 已提交
1213
		ts = ddReq.Base.Timestamp
1214
		collName = ddReq.CollectionName
1215 1216 1217 1218
		collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
		if err != nil {
			return err
		}
C
Cai Yudong 已提交
1219 1220 1221 1222 1223 1224 1225 1226
		if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err != nil {
			if err = c.SendDdCreatePartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
				return err
			}
			invalidateCache = true
		} else {
			log.Debug("partition has been created, skip re-send CreatePartition",
				zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
1227 1228 1229
		}
	case DropPartitionDDType:
		var ddReq = internalpb.DropPartitionRequest{}
1230
		if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
1231 1232
			return err
		}
C
Cai Yudong 已提交
1233
		ts = ddReq.Base.Timestamp
1234
		collName = ddReq.CollectionName
1235 1236 1237 1238
		collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
		if err != nil {
			return err
		}
C
Cai Yudong 已提交
1239 1240 1241 1242 1243 1244 1245 1246
		if _, err = c.MetaTable.GetPartitionByName(collInfo.ID, ddReq.PartitionName, 0); err == nil {
			if err = c.SendDdDropPartitionReq(ctx, &ddReq, collInfo.PhysicalChannelNames); err != nil {
				return err
			}
			invalidateCache = true
		} else {
			log.Debug("partition has been removed, skip re-send DropPartition",
				zap.String("collection name", collName), zap.String("partition name", ddReq.PartitionName))
1247
		}
C
Cai Yudong 已提交
1248
	default:
1249
		return fmt.Errorf("invalid DdOperation %s", ddOp.Type)
C
Cai Yudong 已提交
1250 1251 1252
	}

	if invalidateCache {
1253
		c.ExpireMetaCache(ctx, []string{collName}, ts)
1254 1255 1256
	}

	// Update DDOperation in etcd
X
Xiaofan 已提交
1257
	return c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
1258 1259
}

1260
// Start starts RootCoord.
1261 1262
func (c *Core) Start() error {
	if err := c.checkInit(); err != nil {
C
Cai Yudong 已提交
1263
		log.Debug("RootCoord Start checkInit failed", zap.Error(err))
1264 1265
		return err
	}
N
neza2017 已提交
1266

C
Cai Yudong 已提交
1267
	log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID))
N
neza2017 已提交
1268

1269
	c.startOnce.Do(func() {
1270
		if err := c.proxyManager.WatchProxy(); err != nil {
X
Xiaofan 已提交
1271 1272 1273
			log.Fatal("RootCoord Start WatchProxy failed", zap.Error(err))
			// you can not just stuck here,
			panic(err)
N
neza2017 已提交
1274
		}
C
Cai Yudong 已提交
1275
		if err := c.reSendDdMsg(c.ctx, false); err != nil {
X
Xiaofan 已提交
1276 1277
			log.Fatal("RootCoord Start reSendDdMsg failed", zap.Error(err))
			panic(err)
1278
		}
1279
		c.wg.Add(5)
1280
		go c.startTimeTickLoop()
1281
		go c.tsLoop()
1282
		go c.chanTimeTick.startWatch(&c.wg)
1283
		go c.checkFlushedSegmentsLoop()
1284
		go c.importManager.expireOldTasksLoop(&c.wg)
1285 1286
		Params.RootCoordCfg.CreatedTime = time.Now()
		Params.RootCoordCfg.UpdatedTime = time.Now()
1287
	})
1288

1289 1290 1291
	return nil
}

1292
// Stop stops rootCoord.
1293
func (c *Core) Stop() error {
1294 1295
	c.UpdateStateCode(internalpb.StateCode_Abnormal)

1296
	c.cancel()
1297
	c.wg.Wait()
C
congqixia 已提交
1298 1299
	// wait at most one second to revoke
	c.session.Revoke(time.Second)
1300 1301 1302
	return nil
}

1303
// GetComponentStates get states of components
G
godchen 已提交
1304 1305 1306
func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	code := c.stateCode.Load().(internalpb.StateCode)
	log.Debug("GetComponentStates", zap.String("State Code", internalpb.StateCode_name[int32(code)]))
N
neza2017 已提交
1307

1308 1309 1310 1311 1312
	nodeID := common.NotRegisteredID
	if c.session != nil && c.session.Registered() {
		nodeID = c.session.ServerID
	}

G
godchen 已提交
1313 1314
	return &internalpb.ComponentStates{
		State: &internalpb.ComponentInfo{
1315 1316
			// NodeID:    c.session.ServerID, // will race with Core.Register()
			NodeID:    nodeID,
C
Cai Yudong 已提交
1317
			Role:      typeutil.RootCoordRole,
1318 1319
			StateCode: code,
			ExtraInfo: nil,
1320
		},
N
neza2017 已提交
1321
		Status: &commonpb.Status{
1322
			ErrorCode: commonpb.ErrorCode_Success,
N
neza2017 已提交
1323 1324
			Reason:    "",
		},
G
godchen 已提交
1325
		SubcomponentStates: []*internalpb.ComponentInfo{
N
neza2017 已提交
1326
			{
1327
				NodeID:    nodeID,
C
Cai Yudong 已提交
1328
				Role:      typeutil.RootCoordRole,
N
neza2017 已提交
1329 1330 1331 1332
				StateCode: code,
				ExtraInfo: nil,
			},
		},
1333 1334 1335
	}, nil
}

1336
// GetTimeTickChannel get timetick channel name
G
godchen 已提交
1337 1338 1339
func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
1340
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
1341 1342
			Reason:    "",
		},
1343
		Value: Params.CommonCfg.RootCoordTimeTick,
G
godchen 已提交
1344
	}, nil
1345 1346
}

1347
// GetStatisticsChannel get statistics channel name
G
godchen 已提交
1348 1349 1350
func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
1351
			ErrorCode: commonpb.ErrorCode_Success,
G
godchen 已提交
1352 1353
			Reason:    "",
		},
1354
		Value: Params.CommonCfg.RootCoordStatistics,
G
godchen 已提交
1355
	}, nil
1356 1357
}

1358
// CreateCollection create collection
G
godchen 已提交
1359
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
1360
	metrics.RootCoordCreateCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1361 1362
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1363
	}
C
Cai Yudong 已提交
1364

1365 1366
	tr := timerecord.NewTimeRecorder("CreateCollection")

1367
	log.Debug("CreateCollection", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1368
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1369 1370
	t := &CreateCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1371
			ctx:  ctx,
1372 1373 1374 1375
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1376
	err := executeTask(t)
1377
	if err != nil {
1378
		log.Error("CreateCollection failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1379 1380
			zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateCollection failed: "+err.Error()), nil
1381
	}
1382
	log.Debug("CreateCollection success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1383
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1384

1385
	metrics.RootCoordCreateCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1386
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
1387
	metrics.RootCoordNumOfCollections.Inc()
C
Cai Yudong 已提交
1388
	return succStatus(), nil
1389 1390
}

1391
// DropCollection drop collection
G
godchen 已提交
1392
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
1393
	metrics.RootCoordDropCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1394 1395
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1396
	}
1397
	tr := timerecord.NewTimeRecorder("DropCollection")
1398
	log.Debug("DropCollection", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1399
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1400 1401
	t := &DropCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1402
			ctx:  ctx,
1403 1404 1405 1406
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1407
	err := executeTask(t)
1408
	if err != nil {
1409
		log.Error("DropCollection failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1410 1411
			zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return failStatus(commonpb.ErrorCode_UnexpectedError, "DropCollection failed: "+err.Error()), nil
1412
	}
1413
	log.Debug("DropCollection success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1414
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1415

1416
	metrics.RootCoordDropCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1417
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
1418
	metrics.RootCoordNumOfCollections.Dec()
C
Cai Yudong 已提交
1419
	return succStatus(), nil
1420 1421
}

1422
// HasCollection check collection existence
G
godchen 已提交
1423
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
1424
	metrics.RootCoordHasCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1425
	if code, ok := c.checkHealthy(); !ok {
1426
		return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1427 1428
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
			Value:  false,
1429 1430
		}, nil
	}
1431
	tr := timerecord.NewTimeRecorder("HasCollection")
C
Cai Yudong 已提交
1432

1433
	log.Debug("HasCollection", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1434
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1435 1436
	t := &HasCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1437
			ctx:  ctx,
1438 1439 1440 1441 1442
			core: c,
		},
		Req:           in,
		HasCollection: false,
	}
N
neza2017 已提交
1443
	err := executeTask(t)
1444
	if err != nil {
1445
		log.Error("HasCollection failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1446
			zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
1447
		return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1448
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
C
Cai Yudong 已提交
1449
			Value:  false,
1450 1451
		}, nil
	}
1452
	log.Debug("HasCollection success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1453
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1454

1455
	metrics.RootCoordHasCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1456
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("HasCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
1457
	return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1458 1459
		Status: succStatus(),
		Value:  t.HasCollection,
1460 1461 1462
	}, nil
}

1463
// DescribeCollection return collection info
G
godchen 已提交
1464
func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
1465
	metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1466
	if code, ok := c.checkHealthy(); !ok {
1467
		return &milvuspb.DescribeCollectionResponse{
C
Cai Yudong 已提交
1468
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode"+internalpb.StateCode_name[int32(code)]),
1469 1470
		}, nil
	}
1471
	tr := timerecord.NewTimeRecorder("DescribeCollection")
C
Cai Yudong 已提交
1472

1473
	log.Debug("DescribeCollection", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1474
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1475 1476
	t := &DescribeCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1477
			ctx:  ctx,
1478 1479 1480 1481 1482
			core: c,
		},
		Req: in,
		Rsp: &milvuspb.DescribeCollectionResponse{},
	}
N
neza2017 已提交
1483
	err := executeTask(t)
1484
	if err != nil {
1485
		log.Error("DescribeCollection failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1486
			zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
1487
		return &milvuspb.DescribeCollectionResponse{
C
Cai Yudong 已提交
1488
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeCollection failed: "+err.Error()),
1489 1490
		}, nil
	}
1491
	log.Debug("DescribeCollection success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1492
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1493

1494
	metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1495
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
1496
	t.Rsp.Status = succStatus()
1497 1498 1499
	return t.Rsp, nil
}

1500
// ShowCollections list all collection names
G
godchen 已提交
1501
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
1502
	metrics.RootCoordShowCollectionsCounter.WithLabelValues(MetricRequestsTotal).Inc()
C
Cai Yudong 已提交
1503
	if code, ok := c.checkHealthy(); !ok {
G
godchen 已提交
1504
		return &milvuspb.ShowCollectionsResponse{
C
Cai Yudong 已提交
1505
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
1506 1507
		}, nil
	}
1508
	tr := timerecord.NewTimeRecorder("ShowCollections")
C
Cai Yudong 已提交
1509

1510
	log.Debug("ShowCollections", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1511
		zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
1512 1513
	t := &ShowCollectionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1514
			ctx:  ctx,
1515 1516 1517
			core: c,
		},
		Req: in,
C
Cai Yudong 已提交
1518
		Rsp: &milvuspb.ShowCollectionsResponse{},
1519
	}
N
neza2017 已提交
1520
	err := executeTask(t)
1521
	if err != nil {
1522
		log.Error("ShowCollections failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1523
			zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1524
		return &milvuspb.ShowCollectionsResponse{
C
Cai Yudong 已提交
1525
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
1526 1527
		}, nil
	}
1528
	log.Debug("ShowCollections success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1529 1530
		zap.String("dbname", in.DbName), zap.Int("num of collections", len(t.Rsp.CollectionNames)),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1531

1532
	metrics.RootCoordShowCollectionsCounter.WithLabelValues(MetricRequestsSuccess).Inc()
C
Cai Yudong 已提交
1533
	t.Rsp.Status = succStatus()
1534
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowCollections").Observe(float64(tr.ElapseSpan().Milliseconds()))
1535 1536 1537
	return t.Rsp, nil
}

1538
// CreatePartition create partition
G
godchen 已提交
1539
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
1540
	metrics.RootCoordCreatePartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1541 1542
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1543
	}
1544
	tr := timerecord.NewTimeRecorder("CreatePartition")
1545
	log.Debug("CreatePartition", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1546 1547
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
1548 1549
	t := &CreatePartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1550
			ctx:  ctx,
1551 1552 1553 1554
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1555
	err := executeTask(t)
1556
	if err != nil {
1557
		log.Error("CreatePartition failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1558 1559
			zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
1560
		return failStatus(commonpb.ErrorCode_UnexpectedError, "CreatePartition failed: "+err.Error()), nil
1561
	}
1562
	log.Debug("CreatePartition success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1563 1564
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1565

1566
	metrics.RootCoordCreatePartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1567
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreatePartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
1568
	metrics.RootCoordNumOfPartitions.WithLabelValues(in.CollectionName).Inc()
C
Cai Yudong 已提交
1569
	return succStatus(), nil
1570 1571
}

1572
// DropPartition drop partition
G
godchen 已提交
1573
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
1574
	metrics.RootCoordDropPartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1575 1576
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1577
	}
1578
	tr := timerecord.NewTimeRecorder("DropPartition")
1579
	log.Debug("DropPartition", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1580 1581
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
1582 1583
	t := &DropPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1584
			ctx:  ctx,
1585 1586 1587 1588
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1589
	err := executeTask(t)
1590
	if err != nil {
1591
		log.Error("DropPartition failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1592 1593
			zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
1594
		return failStatus(commonpb.ErrorCode_UnexpectedError, "DropPartition failed: "+err.Error()), nil
1595
	}
1596
	log.Debug("DropPartition success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1597 1598
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1599

1600
	metrics.RootCoordDropPartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1601
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
1602
	metrics.RootCoordNumOfPartitions.WithLabelValues(in.CollectionName).Dec()
C
Cai Yudong 已提交
1603
	return succStatus(), nil
1604 1605
}

1606
// HasPartition check partition existence
G
godchen 已提交
1607
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1608
	metrics.RootCoordHasPartitionCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1609
	if code, ok := c.checkHealthy(); !ok {
1610
		return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1611 1612
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
			Value:  false,
1613 1614
		}, nil
	}
1615
	tr := timerecord.NewTimeRecorder("HasPartition")
C
Cai Yudong 已提交
1616

1617
	log.Debug("HasPartition", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1618 1619
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
1620 1621
	t := &HasPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1622
			ctx:  ctx,
1623 1624 1625 1626 1627
			core: c,
		},
		Req:          in,
		HasPartition: false,
	}
N
neza2017 已提交
1628
	err := executeTask(t)
1629
	if err != nil {
1630
		log.Error("HasPartition failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1631 1632
			zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
1633
		return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1634 1635
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
			Value:  false,
1636 1637
		}, nil
	}
1638
	log.Debug("HasPartition success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1639 1640
		zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1641

1642
	metrics.RootCoordHasPartitionCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1643
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("HasPartition").Observe(float64(tr.ElapseSpan().Milliseconds()))
1644
	return &milvuspb.BoolResponse{
C
Cai Yudong 已提交
1645 1646
		Status: succStatus(),
		Value:  t.HasPartition,
1647 1648 1649
	}, nil
}

1650
// ShowPartitions list all partition names
G
godchen 已提交
1651
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1652
	metrics.RootCoordShowPartitionsCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1653
	if code, ok := c.checkHealthy(); !ok {
G
godchen 已提交
1654
		return &milvuspb.ShowPartitionsResponse{
C
Cai Yudong 已提交
1655
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
1656 1657
		}, nil
	}
C
Cai Yudong 已提交
1658

1659
	tr := timerecord.NewTimeRecorder("ShowPartitions")
1660
	log.Debug("ShowPartitions", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1661
		zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
1662 1663
	t := &ShowPartitionReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1664
			ctx:  ctx,
1665 1666 1667
			core: c,
		},
		Req: in,
C
Cai Yudong 已提交
1668
		Rsp: &milvuspb.ShowPartitionsResponse{},
1669
	}
N
neza2017 已提交
1670
	err := executeTask(t)
1671
	if err != nil {
1672
		log.Error("ShowPartitions failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1673
			zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1674
		return &milvuspb.ShowPartitionsResponse{
C
Cai Yudong 已提交
1675
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
1676 1677
		}, nil
	}
1678
	log.Debug("ShowPartitions success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1679 1680
		zap.String("collection name", in.CollectionName), zap.Int("num of partitions", len(t.Rsp.PartitionNames)),
		zap.Int64("msgID", t.Req.Base.MsgID))
C
Cai Yudong 已提交
1681

1682
	metrics.RootCoordShowPartitionsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
C
Cai Yudong 已提交
1683
	t.Rsp.Status = succStatus()
1684
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowPartitions").Observe(float64(tr.ElapseSpan().Milliseconds()))
1685 1686 1687
	return t.Rsp, nil
}

1688
// CreateIndex create index
G
godchen 已提交
1689
func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1690
	metrics.RootCoordCreateIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1691 1692
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1693
	}
1694
	tr := timerecord.NewTimeRecorder("CreateIndex")
1695
	log.Debug("CreateIndex", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1696 1697
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1698 1699
	t := &CreateIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1700
			ctx:  ctx,
N
neza2017 已提交
1701 1702 1703 1704
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1705
	err := executeTask(t)
N
neza2017 已提交
1706
	if err != nil {
1707
		log.Error("CreateIndex failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1708 1709 1710
			zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateIndex failed: "+err.Error()), nil
N
neza2017 已提交
1711
	}
1712
	log.Debug("CreateIndex success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1713 1714
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1715

1716
	metrics.RootCoordCreateIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1717
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
1718
	return succStatus(), nil
1719 1720
}

1721
// DescribeIndex return index info
G
godchen 已提交
1722
func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1723
	metrics.RootCoordDescribeIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1724
	if code, ok := c.checkHealthy(); !ok {
1725
		return &milvuspb.DescribeIndexResponse{
C
Cai Yudong 已提交
1726
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
1727 1728
		}, nil
	}
1729
	tr := timerecord.NewTimeRecorder("DescribeIndex")
1730
	log.Debug("DescribeIndex", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1731 1732
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1733 1734
	t := &DescribeIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1735
			ctx:  ctx,
N
neza2017 已提交
1736 1737 1738
			core: c,
		},
		Req: in,
C
Cai Yudong 已提交
1739
		Rsp: &milvuspb.DescribeIndexResponse{},
N
neza2017 已提交
1740
	}
N
neza2017 已提交
1741
	err := executeTask(t)
N
neza2017 已提交
1742
	if err != nil {
1743
		log.Error("DescribeIndex failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1744 1745
			zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
N
neza2017 已提交
1746
		return &milvuspb.DescribeIndexResponse{
C
Cai Yudong 已提交
1747
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeIndex failed: "+err.Error()),
N
neza2017 已提交
1748 1749
		}, nil
	}
N
neza2017 已提交
1750 1751 1752 1753
	idxNames := make([]string, 0, len(t.Rsp.IndexDescriptions))
	for _, i := range t.Rsp.IndexDescriptions {
		idxNames = append(idxNames, i.IndexName)
	}
1754
	log.Debug("DescribeIndex success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1755 1756
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1757

1758
	metrics.RootCoordDescribeIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
N
neza2017 已提交
1759
	if len(t.Rsp.IndexDescriptions) == 0 {
C
Cai Yudong 已提交
1760
		t.Rsp.Status = failStatus(commonpb.ErrorCode_IndexNotExist, "index not exist")
N
neza2017 已提交
1761
	} else {
C
Cai Yudong 已提交
1762
		t.Rsp.Status = succStatus()
N
neza2017 已提交
1763
	}
1764
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DescribeIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
N
neza2017 已提交
1765
	return t.Rsp, nil
1766 1767
}

1768
// DropIndex drop index
G
godchen 已提交
1769
func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1770
	metrics.RootCoordDropIndexCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1771 1772
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
N
neza2017 已提交
1773
	}
1774
	tr := timerecord.NewTimeRecorder("DropIndex")
1775
	log.Debug("DropIndex", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1776 1777
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1778 1779
	t := &DropIndexReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1780
			ctx:  ctx,
N
neza2017 已提交
1781 1782 1783 1784
			core: c,
		},
		Req: in,
	}
N
neza2017 已提交
1785
	err := executeTask(t)
N
neza2017 已提交
1786
	if err != nil {
1787
		log.Error("DropIndex failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1788 1789 1790
			zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
			zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return failStatus(commonpb.ErrorCode_UnexpectedError, "DropIndex failed: "+err.Error()), nil
N
neza2017 已提交
1791
	}
1792
	log.Debug("DropIndex success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1793 1794
		zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName),
		zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1795

1796
	metrics.RootCoordDropIndexCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1797
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropIndex").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
1798
	return succStatus(), nil
N
neza2017 已提交
1799 1800
}

1801
// DescribeSegment return segment info
G
godchen 已提交
1802
func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) {
1803
	metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1804
	if code, ok := c.checkHealthy(); !ok {
1805
		return &milvuspb.DescribeSegmentResponse{
C
Cai Yudong 已提交
1806
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
1807 1808
		}, nil
	}
1809
	tr := timerecord.NewTimeRecorder("DescribeSegment")
1810
	log.Debug("DescribeSegment", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1811 1812
		zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID),
		zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1813 1814
	t := &DescribeSegmentReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1815
			ctx:  ctx,
N
neza2017 已提交
1816 1817 1818
			core: c,
		},
		Req: in,
C
Cai Yudong 已提交
1819
		Rsp: &milvuspb.DescribeSegmentResponse{},
N
neza2017 已提交
1820
	}
N
neza2017 已提交
1821
	err := executeTask(t)
N
neza2017 已提交
1822
	if err != nil {
1823
		log.Error("DescribeSegment failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1824 1825
			zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
N
neza2017 已提交
1826
		return &milvuspb.DescribeSegmentResponse{
C
Cai Yudong 已提交
1827
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeSegment failed: "+err.Error()),
N
neza2017 已提交
1828 1829
		}, nil
	}
1830
	log.Debug("DescribeSegment success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1831 1832
		zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
1833

1834
	metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1835
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeSegment").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
1836
	t.Rsp.Status = succStatus()
N
neza2017 已提交
1837
	return t.Rsp, nil
1838 1839
}

1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
func (c *Core) DescribeSegments(ctx context.Context, in *rootcoordpb.DescribeSegmentsRequest) (*rootcoordpb.DescribeSegmentsResponse, error) {
	metrics.RootCoordDescribeSegmentsCounter.WithLabelValues(metrics.TotalLabel).Inc()

	if code, ok := c.checkHealthy(); !ok {
		log.Error("failed to describe segments, rootcoord not healthy",
			zap.String("role", typeutil.RootCoordRole),
			zap.Int64("msgID", in.GetBase().GetMsgID()),
			zap.Int64("collection", in.GetCollectionID()),
			zap.Int64s("segments", in.GetSegmentIDs()))

		return &rootcoordpb.DescribeSegmentsResponse{
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
		}, nil
	}

	tr := timerecord.NewTimeRecorder("DescribeSegments")

	log.Debug("received request to describe segments",
		zap.String("role", typeutil.RootCoordRole),
		zap.Int64("msgID", in.GetBase().GetMsgID()),
		zap.Int64("collection", in.GetCollectionID()),
		zap.Int64s("segments", in.GetSegmentIDs()))

	t := &DescribeSegmentsReqTask{
		baseReqTask: baseReqTask{
			ctx:  ctx,
			core: c,
		},
		Req: in,
		Rsp: &rootcoordpb.DescribeSegmentsResponse{},
	}

	if err := executeTask(t); err != nil {
		log.Error("failed to describe segments",
			zap.Error(err),
			zap.String("role", typeutil.RootCoordRole),
			zap.Int64("msgID", in.GetBase().GetMsgID()),
			zap.Int64("collection", in.GetCollectionID()),
			zap.Int64s("segments", in.GetSegmentIDs()))

		return &rootcoordpb.DescribeSegmentsResponse{
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeSegments failed: "+err.Error()),
		}, nil
	}

	log.Debug("succeed to describe segments",
		zap.String("role", typeutil.RootCoordRole),
		zap.Int64("msgID", in.GetBase().GetMsgID()),
		zap.Int64("collection", in.GetCollectionID()),
		zap.Int64s("segments", in.GetSegmentIDs()))

	metrics.RootCoordDescribeSegmentsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("DescribeSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))

	t.Rsp.Status = succStatus()
	return t.Rsp, nil
}

1898
// ShowSegments list all segments
G
godchen 已提交
1899
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
1900
	metrics.RootCoordShowSegmentsCounter.WithLabelValues(metrics.TotalLabel).Inc()
C
Cai Yudong 已提交
1901
	if code, ok := c.checkHealthy(); !ok {
G
godchen 已提交
1902
		return &milvuspb.ShowSegmentsResponse{
C
Cai Yudong 已提交
1903
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
1904 1905
		}, nil
	}
1906
	tr := timerecord.NewTimeRecorder("ShowSegments")
C
Cai Yudong 已提交
1907

1908
	log.Debug("ShowSegments", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1909 1910
		zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID),
		zap.Int64("msgID", in.Base.MsgID))
N
neza2017 已提交
1911 1912
	t := &ShowSegmentReqTask{
		baseReqTask: baseReqTask{
G
groot 已提交
1913
			ctx:  ctx,
N
neza2017 已提交
1914 1915 1916
			core: c,
		},
		Req: in,
C
Cai Yudong 已提交
1917
		Rsp: &milvuspb.ShowSegmentsResponse{},
N
neza2017 已提交
1918
	}
N
neza2017 已提交
1919
	err := executeTask(t)
N
neza2017 已提交
1920
	if err != nil {
1921
		log.Debug("ShowSegments failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1922 1923
			zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
G
godchen 已提交
1924
		return &milvuspb.ShowSegmentsResponse{
C
Cai Yudong 已提交
1925
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowSegments failed: "+err.Error()),
N
neza2017 已提交
1926 1927
		}, nil
	}
1928
	log.Debug("ShowSegments success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1929 1930
		zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID),
		zap.Int64s("segments ids", t.Rsp.SegmentIDs),
C
Cai Yudong 已提交
1931 1932
		zap.Int64("msgID", in.Base.MsgID))

1933
	metrics.RootCoordShowSegmentsCounter.WithLabelValues(metrics.SuccessLabel).Inc()
1934
	metrics.RootCoordDDLReadTypeLatency.WithLabelValues("ShowSegments").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
1935
	t.Rsp.Status = succStatus()
N
neza2017 已提交
1936
	return t.Rsp, nil
1937 1938
}

1939
// AllocTimestamp alloc timestamp
1940
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
C
Cai Yudong 已提交
1941
	if code, ok := c.checkHealthy(); !ok {
1942
		return &rootcoordpb.AllocTimestampResponse{
C
Cai Yudong 已提交
1943
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
N
neza2017 已提交
1944 1945
		}, nil
	}
N
neza2017 已提交
1946
	ts, err := c.TSOAllocator(in.Count)
1947
	if err != nil {
1948
		log.Error("AllocTimestamp failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1949
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
1950
		return &rootcoordpb.AllocTimestampResponse{
C
Cai Yudong 已提交
1951
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocTimestamp failed: "+err.Error()),
1952 1953
		}, nil
	}
N
neza2017 已提交
1954 1955 1956

	//return first available  time stamp
	ts = ts - uint64(in.Count) + 1
1957
	metrics.RootCoordTimestampAllocCounter.Set(float64(ts))
1958
	return &rootcoordpb.AllocTimestampResponse{
C
Cai Yudong 已提交
1959
		Status:    succStatus(),
1960 1961 1962 1963 1964
		Timestamp: ts,
		Count:     in.Count,
	}, nil
}

1965
// AllocID alloc ids
1966
func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
C
Cai Yudong 已提交
1967
	if code, ok := c.checkHealthy(); !ok {
1968
		return &rootcoordpb.AllocIDResponse{
C
Cai Yudong 已提交
1969
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
N
neza2017 已提交
1970 1971
		}, nil
	}
N
neza2017 已提交
1972
	start, _, err := c.IDAllocator(in.Count)
1973
	if err != nil {
1974
		log.Error("AllocID failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
1975
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
1976
		return &rootcoordpb.AllocIDResponse{
C
Cai Yudong 已提交
1977 1978
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocID failed: "+err.Error()),
			Count:  in.Count,
1979 1980
		}, nil
	}
1981
	metrics.RootCoordIDAllocCounter.Add(float64(in.Count))
1982
	return &rootcoordpb.AllocIDResponse{
C
Cai Yudong 已提交
1983 1984 1985
		Status: succStatus(),
		ID:     start,
		Count:  in.Count,
1986 1987
	}, nil
}
1988 1989 1990

// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
C
Cai Yudong 已提交
1991
	if code, ok := c.checkHealthy(); !ok {
X
Xiaofan 已提交
1992
		log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Any("state", code))
C
Cai Yudong 已提交
1993
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
1994 1995
	}
	if in.Base.MsgType != commonpb.MsgType_TimeTick {
X
Xiaofan 已提交
1996
		log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType))
C
Cai Yudong 已提交
1997 1998
		msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())]
		return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil
1999
	}
2000
	err := c.chanTimeTick.updateTimeTick(in, "gRPC")
2001
	if err != nil {
X
Xiaofan 已提交
2002
		log.Warn("failed to updateTimeTick", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2003
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
2004
		return failStatus(commonpb.ErrorCode_UnexpectedError, "UpdateTimeTick failed: "+err.Error()), nil
2005
	}
C
Cai Yudong 已提交
2006
	return succStatus(), nil
2007
}
N
neza2017 已提交
2008

2009
// ReleaseDQLMessageStream release DQL msgstream
N
neza2017 已提交
2010
func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
C
Cai Yudong 已提交
2011 2012
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
N
neza2017 已提交
2013 2014 2015
	}
	return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in)
}
2016

2017
// SegmentFlushCompleted check whether segment flush has completed
2018
func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
C
Cai Yudong 已提交
2019 2020
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
2021 2022
	}
	if in.Base.MsgType != commonpb.MsgType_SegmentFlushDone {
C
Cai Yudong 已提交
2023
		return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid msg type "+commonpb.MsgType_name[int32(in.Base.MsgType)]), nil
2024
	}
2025
	segID := in.Segment.GetID()
2026
	log.Debug("SegmentFlushCompleted", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2027 2028
		zap.Int64("collection id", in.Segment.CollectionID), zap.Int64("partition id", in.Segment.PartitionID),
		zap.Int64("segment id", segID), zap.Int64("msgID", in.Base.MsgID))
2029

2030
	coll, err := c.MetaTable.GetCollectionByID(in.Segment.CollectionID, 0)
2031
	if err != nil {
2032
		log.Error("GetCollectionByID failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2033
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
2034
		return failStatus(commonpb.ErrorCode_UnexpectedError, "GetCollectionByID failed: "+err.Error()), nil
2035 2036 2037
	}

	if len(coll.FieldIndexes) == 0 {
2038
		log.Debug("no index params on collection", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2039
			zap.String("collection_name", coll.Schema.Name), zap.Int64("msgID", in.Base.MsgID))
2040 2041 2042 2043 2044
	}

	for _, f := range coll.FieldIndexes {
		fieldSch, err := GetFieldSchemaByID(coll, f.FiledID)
		if err != nil {
2045
			log.Warn("field schema not found", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2046 2047
				zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
				zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
2048 2049 2050 2051 2052
			continue
		}

		idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID)
		if err != nil {
2053
			log.Warn("index not found", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2054 2055
				zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
				zap.Int64("index id", f.IndexID), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
2056 2057 2058 2059
			continue
		}

		info := etcdpb.SegmentIndexInfo{
2060 2061 2062 2063 2064 2065
			CollectionID: in.Segment.CollectionID,
			PartitionID:  in.Segment.PartitionID,
			SegmentID:    segID,
			FieldID:      fieldSch.FieldID,
			IndexID:      idxInfo.IndexID,
			EnableIndex:  false,
2066 2067 2068 2069 2070
		}
		info.BuildID, err = c.BuildIndex(ctx, segID, fieldSch, idxInfo, true)
		if err == nil && info.BuildID != 0 {
			info.EnableIndex = true
		} else {
2071
			log.Error("BuildIndex failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2072 2073 2074
				zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
				zap.Int64("index id", f.IndexID), zap.Int64("build id", info.BuildID),
				zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
2075
			continue
2076
		}
2077
		err = c.MetaTable.AddIndex(&info)
2078
		if err != nil {
2079
			log.Error("AddIndex failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2080 2081 2082
				zap.String("collection_name", coll.Schema.Name), zap.Int64("field id", f.FiledID),
				zap.Int64("index id", f.IndexID), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
			continue
2083 2084 2085
		}
	}

2086
	log.Debug("SegmentFlushCompleted success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2087 2088
		zap.Int64("collection id", in.Segment.CollectionID), zap.Int64("partition id", in.Segment.PartitionID),
		zap.Int64("segment id", segID), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
2089
	return succStatus(), nil
2090
}
2091

2092
// GetMetrics get metrics
C
Cai Yudong 已提交
2093 2094
func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if code, ok := c.checkHealthy(); !ok {
2095
		return &milvuspb.GetMetricsResponse{
C
Cai Yudong 已提交
2096
			Status:   failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
2097 2098 2099 2100
			Response: "",
		}, nil
	}

C
Cai Yudong 已提交
2101
	metricType, err := metricsinfo.ParseMetricType(in.Request)
2102
	if err != nil {
2103
		log.Error("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2104
			zap.Int64("node_id", c.session.ServerID), zap.String("req", in.Request), zap.Error(err))
2105
		return &milvuspb.GetMetricsResponse{
C
Cai Yudong 已提交
2106
			Status:   failStatus(commonpb.ErrorCode_UnexpectedError, "ParseMetricType failed: "+err.Error()),
2107 2108 2109 2110
			Response: "",
		}, nil
	}

2111
	log.Debug("GetMetrics success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2112
		zap.String("metric_type", metricType), zap.Int64("msgID", in.Base.MsgID))
2113 2114

	if metricType == metricsinfo.SystemInfoMetrics {
2115 2116 2117 2118 2119
		ret, err := c.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}

2120
		log.Warn("GetSystemInfoMetrics from cache failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2121
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
2122

C
Cai Yudong 已提交
2123 2124
		systemInfoMetrics, err := c.getSystemInfoMetrics(ctx, in)
		if err != nil {
2125
			log.Error("GetSystemInfoMetrics failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2126 2127 2128
				zap.String("metric_type", metricType), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
			return nil, err
		}
2129

2130
		c.metricsCacheManager.UpdateSystemInfoMetrics(systemInfoMetrics)
2131 2132 2133
		return systemInfoMetrics, err
	}

2134
	log.Error("GetMetrics failed, metric type not implemented", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2135
		zap.String("metric_type", metricType), zap.Int64("msgID", in.Base.MsgID))
2136 2137

	return &milvuspb.GetMetricsResponse{
C
Cai Yudong 已提交
2138
		Status:   failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric),
2139 2140 2141
		Response: "",
	}, nil
}
Y
Yusup 已提交
2142

2143
// CreateAlias create collection alias
Y
Yusup 已提交
2144
func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
C
Cai Yudong 已提交
2145 2146
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
Y
Yusup 已提交
2147
	}
2148
	tr := timerecord.NewTimeRecorder("CreateAlias")
2149
	log.Debug("CreateAlias", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2150 2151
		zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
		zap.Int64("msgID", in.Base.MsgID))
Y
Yusup 已提交
2152 2153 2154 2155 2156 2157 2158 2159 2160
	t := &CreateAliasReqTask{
		baseReqTask: baseReqTask{
			ctx:  ctx,
			core: c,
		},
		Req: in,
	}
	err := executeTask(t)
	if err != nil {
2161
		log.Error("CreateAlias failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2162 2163 2164
			zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateAlias failed: "+err.Error()), nil
Y
Yusup 已提交
2165
	}
2166
	log.Debug("CreateAlias success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2167 2168
		zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
2169

2170
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("CreateAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
2171
	return succStatus(), nil
Y
Yusup 已提交
2172 2173
}

2174
// DropAlias drop collection alias
Y
Yusup 已提交
2175
func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
C
Cai Yudong 已提交
2176 2177
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
Y
Yusup 已提交
2178
	}
2179
	tr := timerecord.NewTimeRecorder("DropAlias")
2180
	log.Debug("DropAlias", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2181
		zap.String("alias", in.Alias), zap.Int64("msgID", in.Base.MsgID))
Y
Yusup 已提交
2182 2183 2184 2185 2186 2187 2188 2189 2190
	t := &DropAliasReqTask{
		baseReqTask: baseReqTask{
			ctx:  ctx,
			core: c,
		},
		Req: in,
	}
	err := executeTask(t)
	if err != nil {
2191
		log.Error("DropAlias failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2192
			zap.String("alias", in.Alias), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
2193
		return failStatus(commonpb.ErrorCode_UnexpectedError, "DropAlias failed: "+err.Error()), nil
Y
Yusup 已提交
2194
	}
2195
	log.Debug("DropAlias success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2196
		zap.String("alias", in.Alias), zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
2197

2198
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("DropAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
2199
	return succStatus(), nil
Y
Yusup 已提交
2200 2201
}

2202
// AlterAlias alter collection alias
Y
Yusup 已提交
2203
func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
C
Cai Yudong 已提交
2204 2205
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
Y
Yusup 已提交
2206
	}
2207
	tr := timerecord.NewTimeRecorder("AlterAlias")
2208
	log.Debug("AlterAlias", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2209 2210
		zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
		zap.Int64("msgID", in.Base.MsgID))
Y
Yusup 已提交
2211 2212 2213 2214 2215 2216 2217 2218 2219
	t := &AlterAliasReqTask{
		baseReqTask: baseReqTask{
			ctx:  ctx,
			core: c,
		},
		Req: in,
	}
	err := executeTask(t)
	if err != nil {
2220
		log.Error("AlterAlias failed", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2221 2222
			zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
C
Cai Yudong 已提交
2223
		return failStatus(commonpb.ErrorCode_UnexpectedError, "AlterAlias failed: "+err.Error()), nil
Y
Yusup 已提交
2224
	}
2225
	log.Debug("AlterAlias success", zap.String("role", typeutil.RootCoordRole),
C
Cai Yudong 已提交
2226 2227
		zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName),
		zap.Int64("msgID", in.Base.MsgID))
C
Cai Yudong 已提交
2228

2229
	metrics.RootCoordDDLWriteTypeLatency.WithLabelValues("AlterAlias").Observe(float64(tr.ElapseSpan().Milliseconds()))
C
Cai Yudong 已提交
2230
	return succStatus(), nil
Y
Yusup 已提交
2231
}
G
groot 已提交
2232

2233
// Import imports large files (json, numpy, etc.) on MinIO/S3 storage into Milvus storage.
G
groot 已提交
2234
func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
G
groot 已提交
2235 2236 2237 2238
	if code, ok := c.checkHealthy(); !ok {
		return &milvuspb.ImportResponse{
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
		}, nil
G
groot 已提交
2239 2240
	}

2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254
	// Get collection/partition ID from collection/partition name.
	var cID int64
	var ok bool
	if cID, ok = c.MetaTable.collName2ID[req.GetCollectionName()]; !ok {
		log.Error("failed to find collection ID for collection name",
			zap.String("collection name", req.GetCollectionName()))
		return nil, fmt.Errorf("collection ID not found for collection name %s", req.GetCollectionName())
	}
	log.Info("receive import request",
		zap.String("collection name", req.GetCollectionName()),
		zap.Int64("collection ID", cID),
		zap.String("partition name", req.GetPartitionName()),
		zap.Int("# of files = ", len(req.GetFiles())),
	)
2255
	resp := c.importManager.importJob(ctx, req, cID)
G
groot 已提交
2256 2257 2258
	return resp, nil
}

2259
// GetImportState returns the current state of an import task.
G
groot 已提交
2260
func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
G
groot 已提交
2261 2262 2263 2264 2265
	if code, ok := c.checkHealthy(); !ok {
		return &milvuspb.GetImportStateResponse{
			Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]),
		}, nil
	}
2266
	return c.importManager.getTaskState(req.GetTask()), nil
G
groot 已提交
2267 2268
}

2269
// ReportImport reports import task state to RootCoord.
2270
func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (*commonpb.Status, error) {
2271 2272 2273
	log.Info("receive import state report",
		zap.Int64("task ID", ir.GetTaskId()),
		zap.Any("import state", ir.GetState()))
G
groot 已提交
2274 2275 2276
	if code, ok := c.checkHealthy(); !ok {
		return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
	}
2277
	// Upon receiving ReportImport request, update the related task's state in task store.
2278
	ti, err := c.importManager.updateTaskState(ir)
G
groot 已提交
2279 2280
	if err != nil {
		return &commonpb.Status{
2281
			ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
G
groot 已提交
2282 2283
			Reason:    err.Error(),
		}, nil
G
groot 已提交
2284
	}
2285 2286 2287 2288 2289 2290 2291 2292 2293 2294

	// That's all for reporting, if task hasn't reached persisted or completed status yet.
	if ti.GetState().GetStateCode() != commonpb.ImportState_ImportPersisted &&
		ti.GetState().GetStateCode() != commonpb.ImportState_ImportCompleted {
		log.Debug("transitional import state received, return immediately", zap.Any("import result", ir))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
	}

2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309
	// Reverse look up collection name on collection ID.
	var colName string
	for k, v := range c.MetaTable.collName2ID {
		if v == ti.GetCollectionId() {
			colName = k
		}
	}
	if colName == "" {
		log.Error("Collection name not found for collection ID", zap.Int64("collection ID", ti.GetCollectionId()))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CollectionNameNotFound,
			Reason:    "Collection name not found for collection ID" + strconv.FormatInt(ti.GetCollectionId(), 10),
		}, nil
	}

2310
	// When DataNode has done its thing, remove it from the busy node list.
2311 2312 2313 2314 2315 2316 2317 2318
	func() {
		c.importManager.busyNodesLock.Lock()
		defer c.importManager.busyNodesLock.Unlock()
		delete(c.importManager.busyNodes, ir.GetDatanodeId())
		log.Info("dataNode is no longer busy",
			zap.Int64("dataNode ID", ir.GetDatanodeId()),
			zap.Int64("task ID", ir.GetTaskId()))
	}()
2319 2320 2321 2322

	// Start a loop to check segments' index states periodically.
	c.wg.Add(1)
	go c.checkCompleteIndexLoop(ctx, ti, colName, ir.Segments)
G
groot 已提交
2323

G
groot 已提交
2324 2325 2326
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
G
groot 已提交
2327
}
2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346

// CountCompleteIndex checks indexing status of the given segments, and returns the # of segments that has complete index.
func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, collectionID UniqueID,
	allSegmentIDs []UniqueID) (int, error) {
	// Note: Index name is always Params.CommonCfg.DefaultIndexName in current Milvus design as of today.
	indexName := Params.CommonCfg.DefaultIndexName

	// Retrieve index status and detailed index information.
	describeIndexReq := &milvuspb.DescribeIndexRequest{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_DescribeIndex,
		},
		CollectionName: collectionName,
		IndexName:      indexName,
	}
	indexDescriptionResp, err := c.DescribeIndex(ctx, describeIndexReq)
	if err != nil {
		return 0, err
	}
2347
	log.Debug("got index description", zap.String("index_description", indexDescriptionResp.String()))
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361

	// Check if the target index name exists.
	matchIndexID := int64(-1)
	foundIndexID := false
	for _, desc := range indexDescriptionResp.IndexDescriptions {
		if desc.IndexName == indexName {
			matchIndexID = desc.IndexID
			foundIndexID = true
			break
		}
	}
	if !foundIndexID {
		return 0, fmt.Errorf("no index is created")
	}
2362
	log.Debug("found match index ID", zap.Int64("match index ID", matchIndexID))
2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378

	getIndexStatesRequest := &indexpb.GetIndexStatesRequest{
		IndexBuildIDs: make([]UniqueID, 0),
	}

	// Fetch index build IDs from segments.
	for _, segmentID := range allSegmentIDs {
		describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_DescribeSegment,
			},
			CollectionID: collectionID,
			SegmentID:    segmentID,
		}
		segmentDesc, err := c.DescribeSegment(ctx, describeSegmentRequest)
		if err != nil {
2379 2380 2381
			log.Error("Failed to describe segment",
				zap.Int64("collection ID", collectionID),
				zap.Int64("segment ID", segmentID))
2382 2383 2384 2385 2386 2387 2388 2389
			return 0, err
		}
		if segmentDesc.IndexID == matchIndexID {
			if segmentDesc.EnableIndex {
				getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID)
			}
		}
	}
2390
	log.Debug("proxy GetIndexState", zap.Int("# of IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err))
2391 2392

	if len(getIndexStatesRequest.IndexBuildIDs) == 0 {
2393 2394 2395
		log.Info("empty index build IDs returned",
			zap.String("collection name", collectionName),
			zap.Int64("collection ID", collectionID))
2396 2397 2398 2399
		return 0, nil
	}
	states, err := c.CallGetIndexStatesService(ctx, getIndexStatesRequest.IndexBuildIDs)
	if err != nil {
2400
		log.Error("failed to get index state in checkSegmentIndexStates", zap.Error(err))
2401 2402 2403 2404 2405 2406 2407 2408 2409 2410
		return 0, err
	}

	// Count the # of segments with finished index.
	ct := 0
	for _, s := range states {
		if s.State == commonpb.IndexState_Finished {
			ct++
		}
	}
2411
	log.Info("segment indexing state checked",
2412 2413 2414 2415 2416 2417 2418
		zap.Int("# of checked segment", len(states)),
		zap.Int("# of segments with complete index", ct),
		zap.String("collection name", collectionName),
		zap.Int64("collection ID", collectionID),
	)
	return ct, nil
}
2419

2420 2421
// checkCompleteIndexLoop checks index build states for an import task's segments and bring these segments online when
// the criteria are met. checkCompleteIndexLoop does the check every CheckCompleteIndexInterval and exits if:
2422
// (1) a certain percent of indices are built, (2) when context is done or (3) when `ImportIndexWaitLimit` has passed.
2423
func (c *Core) checkCompleteIndexLoop(ctx context.Context, ti *datapb.ImportTaskInfo, colName string, segIDs []UniqueID) {
2424
	defer c.wg.Done()
2425 2426 2427 2428
	ticker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportIndexCheckInterval*1000) * time.Millisecond)
	defer ticker.Stop()
	expireTicker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportIndexWaitLimit*1000) * time.Millisecond)
	defer expireTicker.Stop()
2429 2430 2431
	for {
		select {
		case <-c.ctx.Done():
2432
			log.Info("(in check complete index loop) context done, exiting checkCompleteIndexLoop",
2433
				zap.Int64("task ID", ti.GetId()))
2434 2435
			return
		case <-ticker.C:
2436
			log.Info("(in check complete index loop) check segments' index states", zap.Int64("task ID", ti.GetId()))
2437 2438
			if ct, err := c.CountCompleteIndex(ctx, colName, ti.GetCollectionId(), segIDs); err == nil &&
				segmentsOnlineReady(ct, len(segIDs)) {
2439
				log.Info("segment indices are ready",
2440 2441 2442
					zap.Int64("task ID", ti.GetId()),
					zap.Int("total # of segments", len(segIDs)),
					zap.Int("# of segments with index ready", ct))
2443
				c.bringSegmentsOnline(ctx, segIDs)
2444 2445 2446
				return
			}
		case <-expireTicker.C:
2447
			log.Info("(in check complete index loop) waited for sufficiently long time, bring segments online",
2448
				zap.Int64("task ID", ti.GetId()))
2449
			c.bringSegmentsOnline(ctx, segIDs)
2450 2451 2452 2453 2454
			return
		}
	}
}

2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466
// bringSegmentsOnline brings the segments online so that data in these segments become searchable
// it is done by changing segments' states from `importing` to `flushed`.
func (c *Core) bringSegmentsOnline(ctx context.Context, segIDs []UniqueID) {
	log.Info("bringing import task's segments online!", zap.Any("segment IDs", segIDs))
	// TODO: Make update on segment states atomic.
	for _, id := range segIDs {
		// Explicitly mark segment states `flushed`.
		c.CallUpdateSegmentStateService(ctx, id, commonpb.SegmentState_Flushed)
	}
}

// segmentsOnlineReady returns true if segments are ready to go up online (a.k.a. become searchable).
2467 2468 2469 2470 2471 2472 2473 2474 2475
func segmentsOnlineReady(idxBuilt, segCount int) bool {
	// Consider segments are ready when:
	// (1) all but up to 2 segments have indices ready, or
	// (2) over 85% of segments have indices ready.
	if segCount-idxBuilt <= 2 || float64(idxBuilt)/float64(segCount) > 0.85 {
		return true
	}
	return false
}
2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659

// ExpireCredCache will call invalidate credential cache
func (c *Core) ExpireCredCache(ctx context.Context, username string) error {
	req := proxypb.InvalidateCredCacheRequest{
		Base: &commonpb.MsgBase{
			MsgType:  0, //TODO, msg type
			MsgID:    0, //TODO, msg id
			SourceID: c.session.ServerID,
		},
		Username: username,
	}
	return c.proxyClientManager.InvalidateCredentialCache(ctx, &req)
}

// UpdateCredCache will call update credential cache
func (c *Core) UpdateCredCache(ctx context.Context, credInfo *internalpb.CredentialInfo) error {
	req := proxypb.UpdateCredCacheRequest{
		Base: &commonpb.MsgBase{
			MsgType:  0, //TODO, msg type
			MsgID:    0, //TODO, msg id
			SourceID: c.session.ServerID,
		},
		Username: credInfo.Username,
		Password: credInfo.EncryptedPassword,
	}
	return c.proxyClientManager.UpdateCredentialCache(ctx, &req)
}

// ClearCredUsersCache will call clear credential usernames cache
func (c *Core) ClearCredUsersCache(ctx context.Context) error {
	req := internalpb.ClearCredUsersCacheRequest{}
	return c.proxyClientManager.ClearCredUsersCache(ctx, &req)
}

// CreateCredential create new user and password
// 	1. decode ciphertext password to raw password
// 	2. encrypt raw password
// 	3. save in to etcd
func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
	metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("CreateCredential")
	log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", credInfo.Username))

	if cred, _ := c.MetaTable.getCredential(credInfo.Username); cred != nil {
		return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "user already exists:"+credInfo.Username), nil
	}
	// update proxy's local cache
	err := c.ClearCredUsersCache(ctx)
	if err != nil {
		log.Error("CreateCredential clear credential username list cache failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", credInfo.Username), zap.Error(err))
		metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil
	}
	// insert to db
	err = c.MetaTable.AddCredential(credInfo)
	if err != nil {
		log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", credInfo.Username), zap.Error(err))
		metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil
	}
	log.Debug("CreateCredential success", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", credInfo.Username))

	metrics.RootCoordCreateCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("CreateCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfCredentials.Inc()
	return succStatus(), nil
}

// GetCredential get credential by username
func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
	metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("GetCredential")
	log.Debug("GetCredential", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", in.Username))

	credInfo, err := c.MetaTable.getCredential(in.Username)
	if err != nil {
		log.Error("GetCredential query credential failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", in.Username), zap.Error(err))
		metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return &rootcoordpb.GetCredentialResponse{
			Status: failStatus(commonpb.ErrorCode_GetCredentialFailure, "GetCredential failed: "+err.Error()),
		}, err
	}
	log.Debug("GetCredential success", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", in.Username))

	metrics.RootCoordGetCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordCredentialReadTypeLatency.WithLabelValues("GetCredential", in.Username).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &rootcoordpb.GetCredentialResponse{
		Status:   succStatus(),
		Username: credInfo.Username,
		Password: credInfo.EncryptedPassword,
	}, nil
}

// UpdateCredential update password for a user
func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.CredentialInfo) (*commonpb.Status, error) {
	metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("UpdateCredential")
	log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", credInfo.Username))
	// update proxy's local cache
	err := c.UpdateCredCache(ctx, credInfo)
	if err != nil {
		log.Error("UpdateCredential update credential cache failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", credInfo.Username), zap.Error(err))
		metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
	}
	// update data on storage
	err = c.MetaTable.AddCredential(credInfo)
	if err != nil {
		log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", credInfo.Username), zap.Error(err))
		metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
	}
	log.Debug("UpdateCredential success", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", credInfo.Username))

	metrics.RootCoordUpdateCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("UpdateCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
	return succStatus(), nil
}

// DeleteCredential delete a user
func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
	metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("DeleteCredential")

	log.Debug("DeleteCredential", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", in.Username))
	// invalidate proxy's local cache
	err := c.ExpireCredCache(ctx, in.Username)
	if err != nil {
		log.Error("DeleteCredential expire credential cache failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", in.Username), zap.Error(err))
		metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), nil
	}
	// delete data on storage
	err = c.MetaTable.DeleteCredential(in.Username)
	if err != nil {
		log.Error("DeleteCredential remove credential failed", zap.String("role", typeutil.RootCoordRole),
			zap.String("username", in.Username), zap.Error(err))
		metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.FailLabel).Inc()
		return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), err
	}
	log.Debug("DeleteCredential success", zap.String("role", typeutil.RootCoordRole),
		zap.String("username", in.Username))

	metrics.RootCoordDeleteCredentialCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordCredentialWriteTypeLatency.WithLabelValues("DeleteCredential").Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.RootCoordNumOfCredentials.Dec()
	return succStatus(), nil
}

// ListCredUsers list all usernames
func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
	metrics.RootCoordListCredUsersCounter.WithLabelValues(metrics.TotalLabel).Inc()
	tr := timerecord.NewTimeRecorder("ListCredUsers")

	credInfo, err := c.MetaTable.ListCredentialUsernames()
	if err != nil {
		log.Error("ListCredUsers query usernames failed", zap.String("role", typeutil.RootCoordRole),
			zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
		return &milvuspb.ListCredUsersResponse{
			Status: failStatus(commonpb.ErrorCode_ListCredUsersFailure, "ListCredUsers failed: "+err.Error()),
		}, err
	}
	log.Debug("ListCredUsers success", zap.String("role", typeutil.RootCoordRole))

	metrics.RootCoordListCredUsersCounter.WithLabelValues(metrics.SuccessLabel).Inc()
	metrics.RootCoordCredentialReadTypeLatency.WithLabelValues("ListCredUsers", "ALL.API").Observe(float64(tr.ElapseSpan().Milliseconds()))
	return &milvuspb.ListCredUsersResponse{
		Status:    succStatus(),
		Usernames: credInfo.Usernames,
	}, nil
}