impl.go 141.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"fmt"
C
cai.zhang 已提交
22
	"os"
23
	"strconv"
24 25 26
	"sync"

	"golang.org/x/sync/errgroup"
27

28
	"github.com/golang/protobuf/proto"
S
SimFG 已提交
29 30
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
S
smellthemoon 已提交
31
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
32
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/log"
34
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
35
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
36 37 38 39
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
40
	"github.com/milvus-io/milvus/internal/util"
41
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43
	"github.com/milvus-io/milvus/internal/util/errorutil"
44
	"github.com/milvus-io/milvus/internal/util/importutil"
45 46
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
47
	"github.com/milvus-io/milvus/internal/util/paramtable"
48
	"github.com/milvus-io/milvus/internal/util/timerecord"
49
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
50
	"github.com/milvus-io/milvus/internal/util/typeutil"
51
	"go.uber.org/zap"
52 53
)

54 55
const moduleName = "Proxy"

56
// UpdateStateCode updates the state code of Proxy.
57
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
58
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
59 60
}

61
// GetComponentStates get state of Proxy.
62 63
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
64 65 66 67
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
68
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
69 70 71 72 73 74
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
75
		return stats, nil
G
godchen 已提交
76
	}
77 78 79 80
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
81
	info := &milvuspb.ComponentInfo{
82 83
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
84
		Role:      typeutil.ProxyRole,
G
godchen 已提交
85 86 87 88 89 90
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
91
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
92
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
93 94 95 96 97 98 99 100 101
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

102
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
103
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
104 105 106
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
107
	ctx = logutil.WithModule(ctx, moduleName)
108 109 110
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.Finish()
	log := log.Ctx(ctx).With(
111
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
112
		zap.String("db", request.DbName),
113 114
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
115

116 117
	log.Info("received request to invalidate collection meta cache")

118
	collectionName := request.CollectionName
119
	collectionID := request.CollectionID
X
Xiaofan 已提交
120 121

	var aliasName []string
N
neza2017 已提交
122
	if globalMetaCache != nil {
123 124 125 126
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
127
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
128
		}
N
neza2017 已提交
129
	}
130 131
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
132 133
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
134
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
135
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
136
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
137
		}
138
	}
139
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
140

141
	return &commonpb.Status{
142
		ErrorCode: commonpb.ErrorCode_Success,
143 144
		Reason:    "",
	}, nil
145 146
}

147
// CreateCollection create a collection by the schema.
148
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
149
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
150 151 152
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
153 154 155

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
156 157 158
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
159
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
160

161
	cct := &createCollectionTask{
S
sunby 已提交
162
		ctx:                     ctx,
163 164
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
165
		rootCoord:               node.rootCoord,
166 167
	}

168 169 170
	// avoid data race
	lenOfSchema := len(request.Schema)

171
	log := log.Ctx(ctx).With(
172
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
173 174
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
175
		zap.Int("len(schema)", lenOfSchema),
176 177
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
178

179 180
	log.Debug(rpcReceived(method))

181 182 183
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
184
			zap.Error(err))
185

E
Enwei Jiao 已提交
186
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
187
		return &commonpb.Status{
188
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
189 190 191 192
			Reason:    err.Error(),
		}, nil
	}

193 194
	log.Debug(
		rpcEnqueued(method),
195 196
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
197
		zap.Uint64("timestamp", request.Base.Timestamp))
198

199 200 201
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
202
			zap.Error(err),
203
			zap.Uint64("BeginTs", cct.BeginTs()),
204
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
205

E
Enwei Jiao 已提交
206
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
207
		return &commonpb.Status{
208
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
209 210 211 212
			Reason:    err.Error(),
		}, nil
	}

213 214
	log.Debug(
		rpcDone(method),
215
		zap.Uint64("BeginTs", cct.BeginTs()),
216
		zap.Uint64("EndTs", cct.EndTs()))
217

E
Enwei Jiao 已提交
218 219
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
220 221 222
	return cct.result, nil
}

223
// DropCollection drop a collection.
C
Cai Yudong 已提交
224
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
225 226 227
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
228 229 230

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
231 232
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
233
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
234

235
	dct := &dropCollectionTask{
S
sunby 已提交
236
		ctx:                   ctx,
237 238
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
239
		rootCoord:             node.rootCoord,
240
		chMgr:                 node.chMgr,
S
sunby 已提交
241
		chTicker:              node.chTicker,
242 243
	}

244
	log := log.Ctx(ctx).With(
245
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
246 247
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
248

249 250
	log.Debug("DropCollection received")

251 252
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
253
			zap.Error(err))
254

E
Enwei Jiao 已提交
255
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
256
		return &commonpb.Status{
257
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
258 259 260 261
			Reason:    err.Error(),
		}, nil
	}

262 263
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
264
		zap.Uint64("EndTs", dct.EndTs()))
265 266 267

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
268
			zap.Error(err),
269
			zap.Uint64("BeginTs", dct.BeginTs()),
270
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
271

E
Enwei Jiao 已提交
272
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
273
		return &commonpb.Status{
274
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
275 276 277 278
			Reason:    err.Error(),
		}, nil
	}

279 280
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
281
		zap.Uint64("EndTs", dct.EndTs()))
282

E
Enwei Jiao 已提交
283 284
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
285 286 287
	return dct.result, nil
}

288
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
289
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
290 291 292 293 294
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
295 296 297

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
298 299
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
300
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
301
		metrics.TotalLabel).Inc()
302

303
	log := log.Ctx(ctx).With(
304
		zap.String("role", typeutil.ProxyRole),
305 306 307
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

308 309
	log.Debug("HasCollection received")

310
	hct := &hasCollectionTask{
S
sunby 已提交
311
		ctx:                  ctx,
312 313
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
314
		rootCoord:            node.rootCoord,
315 316
	}

317 318
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
319
			zap.Error(err))
320

E
Enwei Jiao 已提交
321
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
322
			metrics.AbandonLabel).Inc()
323 324
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
325
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
326 327 328 329 330
				Reason:    err.Error(),
			},
		}, nil
	}

331 332
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
333
		zap.Uint64("EndTS", hct.EndTs()))
334 335 336

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
337
			zap.Error(err),
338
			zap.Uint64("BeginTS", hct.BeginTs()),
339
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
340

E
Enwei Jiao 已提交
341
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
342
			metrics.FailLabel).Inc()
343 344
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
345
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
346 347 348 349 350
				Reason:    err.Error(),
			},
		}, nil
	}

351 352
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
353
		zap.Uint64("EndTS", hct.EndTs()))
354

E
Enwei Jiao 已提交
355
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
356
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
357
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
358 359 360
	return hct.result, nil
}

361
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
362
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
363 364 365
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
366 367 368

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
369 370
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
371
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
372
		metrics.TotalLabel).Inc()
373
	lct := &loadCollectionTask{
S
sunby 已提交
374
		ctx:                   ctx,
375 376
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
377
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
378
		indexCoord:            node.indexCoord,
379 380
	}

381
	log := log.Ctx(ctx).With(
382
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
383 384
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
385

386 387
	log.Debug("LoadCollection received")

388 389
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
390
			zap.Error(err))
391

E
Enwei Jiao 已提交
392
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
393
			metrics.AbandonLabel).Inc()
394
		return &commonpb.Status{
395
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
396 397 398
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
399

400 401
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
402
		zap.Uint64("EndTS", lct.EndTs()))
403 404 405

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
406
			zap.Error(err),
407
			zap.Uint64("BeginTS", lct.BeginTs()),
408
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
409
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
410
			metrics.FailLabel).Inc()
411
		return &commonpb.Status{
412
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
413 414 415 416
			Reason:    err.Error(),
		}, nil
	}

417 418
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
419
		zap.Uint64("EndTS", lct.EndTs()))
420

E
Enwei Jiao 已提交
421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
424
	return lct.result, nil
425 426
}

427
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
428
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
429 430 431
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
432

433
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
434
	defer sp.Finish()
435 436
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
437
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
438
		metrics.TotalLabel).Inc()
439
	rct := &releaseCollectionTask{
S
sunby 已提交
440
		ctx:                      ctx,
441 442
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
443
		queryCoord:               node.queryCoord,
444
		chMgr:                    node.chMgr,
445 446
	}

447
	log := log.Ctx(ctx).With(
448
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
449 450
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
451

452 453
	log.Debug(rpcReceived(method))

454
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
455 456
		log.Warn(
			rpcFailedToEnqueue(method),
457
			zap.Error(err))
458

E
Enwei Jiao 已提交
459
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
460
			metrics.AbandonLabel).Inc()
461
		return &commonpb.Status{
462
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
463 464 465 466
			Reason:    err.Error(),
		}, nil
	}

467 468
	log.Debug(
		rpcEnqueued(method),
469
		zap.Uint64("BeginTS", rct.BeginTs()),
470
		zap.Uint64("EndTS", rct.EndTs()))
471 472

	if err := rct.WaitToFinish(); err != nil {
473 474
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
475
			zap.Error(err),
476
			zap.Uint64("BeginTS", rct.BeginTs()),
477
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
478

E
Enwei Jiao 已提交
479
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
480
			metrics.FailLabel).Inc()
481
		return &commonpb.Status{
482
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
483 484 485 486
			Reason:    err.Error(),
		}, nil
	}

487 488
	log.Debug(
		rpcDone(method),
489
		zap.Uint64("BeginTS", rct.BeginTs()),
490
		zap.Uint64("EndTS", rct.EndTs()))
491

E
Enwei Jiao 已提交
492
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
493
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
494
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
495
	return rct.result, nil
496 497
}

498
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
499
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
500 501 502 503 504
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
505

506
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
507
	defer sp.Finish()
508 509
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
510
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
511
		metrics.TotalLabel).Inc()
512

513
	dct := &describeCollectionTask{
S
sunby 已提交
514
		ctx:                       ctx,
515 516
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
517
		rootCoord:                 node.rootCoord,
518 519
	}

520
	log := log.Ctx(ctx).With(
521
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
522 523
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
524

525 526
	log.Debug("DescribeCollection received")

527 528
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
529
			zap.Error(err))
530

E
Enwei Jiao 已提交
531
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
532
			metrics.AbandonLabel).Inc()
533 534
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
535
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
536 537 538 539 540
				Reason:    err.Error(),
			},
		}, nil
	}

541 542
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
543
		zap.Uint64("EndTS", dct.EndTs()))
544 545 546

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
547
			zap.Error(err),
548
			zap.Uint64("BeginTS", dct.BeginTs()),
549
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
550

E
Enwei Jiao 已提交
551
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
552
			metrics.FailLabel).Inc()
553

554 555
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
556
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
557 558 559 560 561
				Reason:    err.Error(),
			},
		}, nil
	}

562 563
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
564
		zap.Uint64("EndTS", dct.EndTs()))
565

E
Enwei Jiao 已提交
566
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
567
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
568
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
569 570 571
	return dct.result, nil
}

572 573 574 575 576 577 578 579 580
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

581
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetStatistics")
582 583 584
	defer sp.Finish()
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
585
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
586
		metrics.TotalLabel).Inc()
587 588 589 590 591 592 593 594 595 596
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

597
	log := log.Ctx(ctx).With(
598 599
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
600 601 602 603
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
604 605 606 607 608 609 610 611
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
612
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
637
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
638 639 640 641 642 643 644 645 646 647 648 649 650
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
651
		zap.Uint64("EndTS", g.EndTs()))
652

E
Enwei Jiao 已提交
653
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
654
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
655
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
656 657 658
	return g.result, nil
}

659
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
660
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
661 662 663 664 665
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
666 667 668

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
669 670
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
671
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
672
		metrics.TotalLabel).Inc()
673
	g := &getCollectionStatisticsTask{
G
godchen 已提交
674 675 676
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
677
		dataCoord:                      node.dataCoord,
678 679
	}

680
	log := log.Ctx(ctx).With(
681
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
682 683
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
684

685 686
	log.Debug(rpcReceived(method))

687
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
688 689
		log.Warn(
			rpcFailedToEnqueue(method),
690
			zap.Error(err))
691

E
Enwei Jiao 已提交
692
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
693
			metrics.AbandonLabel).Inc()
694

G
godchen 已提交
695
		return &milvuspb.GetCollectionStatisticsResponse{
696
			Status: &commonpb.Status{
697
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
698 699 700 701 702
				Reason:    err.Error(),
			},
		}, nil
	}

703 704
	log.Debug(
		rpcEnqueued(method),
705
		zap.Uint64("BeginTS", g.BeginTs()),
706
		zap.Uint64("EndTS", g.EndTs()))
707 708

	if err := g.WaitToFinish(); err != nil {
709 710
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
711
			zap.Error(err),
712
			zap.Uint64("BeginTS", g.BeginTs()),
713
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
714

E
Enwei Jiao 已提交
715
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
716
			metrics.FailLabel).Inc()
717

G
godchen 已提交
718
		return &milvuspb.GetCollectionStatisticsResponse{
719
			Status: &commonpb.Status{
720
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
721 722 723 724 725
				Reason:    err.Error(),
			},
		}, nil
	}

726 727
	log.Debug(
		rpcDone(method),
728
		zap.Uint64("BeginTS", g.BeginTs()),
729
		zap.Uint64("EndTS", g.EndTs()))
730

E
Enwei Jiao 已提交
731
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
732
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
733
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
734
	return g.result, nil
735 736
}

737
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
738
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
739 740 741 742 743
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
744 745
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowCollections")
	defer sp.Finish()
746 747
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
748
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
749

750
	sct := &showCollectionsTask{
G
godchen 已提交
751 752 753
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
754
		queryCoord:             node.queryCoord,
755
		rootCoord:              node.rootCoord,
756 757
	}

758
	log := log.Ctx(ctx).With(
759
		zap.String("role", typeutil.ProxyRole),
760 761
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
762 763 764 765
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
766

767
	err := node.sched.ddQueue.Enqueue(sct)
768
	if err != nil {
769 770
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
771
			zap.Any("CollectionNames", request.CollectionNames))
772

E
Enwei Jiao 已提交
773
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
774
		return &milvuspb.ShowCollectionsResponse{
775
			Status: &commonpb.Status{
776
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
777 778 779 780 781
				Reason:    err.Error(),
			},
		}, nil
	}

782
	log.Debug("ShowCollections enqueued",
783
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
784

785 786
	err = sct.WaitToFinish()
	if err != nil {
787 788
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
789
			zap.Any("CollectionNames", request.CollectionNames))
790

E
Enwei Jiao 已提交
791
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
792

G
godchen 已提交
793
		return &milvuspb.ShowCollectionsResponse{
794
			Status: &commonpb.Status{
795
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
796 797 798 799 800
				Reason:    err.Error(),
			},
		}, nil
	}

801
	log.Debug("ShowCollections Done",
802 803
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
804

E
Enwei Jiao 已提交
805 806
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
807 808 809
	return sct.result, nil
}

J
jaime 已提交
810 811 812 813 814 815 816 817 818 819
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterCollection")
	defer sp.Finish()
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
820
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
821 822 823 824 825 826 827 828

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

829
	log := log.Ctx(ctx).With(
J
jaime 已提交
830 831 832 833
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

834 835 836
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
837 838 839
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
840
			zap.Error(err))
J
jaime 已提交
841

E
Enwei Jiao 已提交
842
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
843 844 845 846 847 848 849 850 851 852
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
853
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
854 855 856 857 858 859

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
860
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
861

E
Enwei Jiao 已提交
862
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
863 864 865 866 867 868 869 870 871
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
872
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
873

E
Enwei Jiao 已提交
874 875
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
876 877 878
	return act.result, nil
}

879
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
880
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
881 882 883
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
884

885
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
886
	defer sp.Finish()
887 888
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
889
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
890

891
	cpt := &createPartitionTask{
S
sunby 已提交
892
		ctx:                    ctx,
893 894
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
895
		rootCoord:              node.rootCoord,
896 897 898
		result:                 nil,
	}

899
	log := log.Ctx(ctx).With(
900
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
901 902 903
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
904

905 906
	log.Debug(rpcReceived("CreatePartition"))

907 908 909
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
910
			zap.Error(err))
911

E
Enwei Jiao 已提交
912
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
913

914
		return &commonpb.Status{
915
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
916 917 918
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
919

920 921 922
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
923
		zap.Uint64("EndTS", cpt.EndTs()))
924 925 926 927

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
928
			zap.Error(err),
929
			zap.Uint64("BeginTS", cpt.BeginTs()),
930
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
931

E
Enwei Jiao 已提交
932
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
933

934
		return &commonpb.Status{
935
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
936 937 938
			Reason:    err.Error(),
		}, nil
	}
939 940 941 942

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
943
		zap.Uint64("EndTS", cpt.EndTs()))
944

E
Enwei Jiao 已提交
945 946
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
947 948 949
	return cpt.result, nil
}

950
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
951
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
952 953 954
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
955

956
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
957
	defer sp.Finish()
958 959
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
960
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
961

962
	dpt := &dropPartitionTask{
S
sunby 已提交
963
		ctx:                  ctx,
964 965
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
966
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
967
		queryCoord:           node.queryCoord,
968 969 970
		result:               nil,
	}

971
	log := log.Ctx(ctx).With(
972
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
973 974 975
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
976

977 978
	log.Debug(rpcReceived(method))

979 980 981
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
982
			zap.Error(err))
983

E
Enwei Jiao 已提交
984
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
985

986
		return &commonpb.Status{
987
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
988 989 990
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
991

992 993 994
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
995
		zap.Uint64("EndTS", dpt.EndTs()))
996 997 998 999

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1000
			zap.Error(err),
1001
			zap.Uint64("BeginTS", dpt.BeginTs()),
1002
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1003

E
Enwei Jiao 已提交
1004
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1005

1006
		return &commonpb.Status{
1007
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1008 1009 1010
			Reason:    err.Error(),
		}, nil
	}
1011 1012 1013 1014

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1015
		zap.Uint64("EndTS", dpt.EndTs()))
1016

E
Enwei Jiao 已提交
1017 1018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1019 1020 1021
	return dpt.result, nil
}

1022
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1023
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1024 1025 1026 1027 1028
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1029

D
dragondriver 已提交
1030
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1031
	defer sp.Finish()
1032 1033 1034
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1035
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1036
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1037

1038
	hpt := &hasPartitionTask{
S
sunby 已提交
1039
		ctx:                 ctx,
1040 1041
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1042
		rootCoord:           node.rootCoord,
1043 1044 1045
		result:              nil,
	}

1046
	log := log.Ctx(ctx).With(
1047
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1048 1049 1050
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1051

1052 1053
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1054 1055 1056
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1057
			zap.Error(err))
D
dragondriver 已提交
1058

E
Enwei Jiao 已提交
1059
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1060
			metrics.AbandonLabel).Inc()
1061

1062 1063
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1064
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1065 1066 1067 1068 1069
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1070

D
dragondriver 已提交
1071 1072 1073
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1074
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1075 1076 1077 1078

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1079
			zap.Error(err),
D
dragondriver 已提交
1080
			zap.Uint64("BeginTS", hpt.BeginTs()),
1081
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1082

E
Enwei Jiao 已提交
1083
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1084
			metrics.FailLabel).Inc()
1085

1086 1087
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1088
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1089 1090 1091 1092 1093
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1094 1095 1096 1097

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1098
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1099

E
Enwei Jiao 已提交
1100
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1101
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1102
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1103 1104 1105
	return hpt.result, nil
}

1106
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1107
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1108 1109 1110
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1111

D
dragondriver 已提交
1112
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1113
	defer sp.Finish()
1114 1115
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1116
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1117
		metrics.TotalLabel).Inc()
1118
	lpt := &loadPartitionsTask{
G
godchen 已提交
1119 1120 1121
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1122
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
1123
		indexCoord:            node.indexCoord,
1124 1125
	}

1126
	log := log.Ctx(ctx).With(
1127
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1128 1129 1130
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1131

1132 1133
	log.Debug(rpcReceived(method))

1134 1135 1136
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1137
			zap.Error(err))
1138

E
Enwei Jiao 已提交
1139
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1140
			metrics.AbandonLabel).Inc()
1141

1142
		return &commonpb.Status{
1143
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1144 1145 1146 1147
			Reason:    err.Error(),
		}, nil
	}

1148 1149 1150
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1151
		zap.Uint64("EndTS", lpt.EndTs()))
1152 1153 1154 1155

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1156
			zap.Error(err),
1157
			zap.Uint64("BeginTS", lpt.BeginTs()),
1158
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1159

E
Enwei Jiao 已提交
1160
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1161
			metrics.FailLabel).Inc()
1162

1163
		return &commonpb.Status{
1164
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1165 1166 1167 1168
			Reason:    err.Error(),
		}, nil
	}

1169 1170 1171
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1172
		zap.Uint64("EndTS", lpt.EndTs()))
1173

E
Enwei Jiao 已提交
1174
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1175
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1176
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1177
	return lpt.result, nil
1178 1179
}

1180
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1181
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1182 1183 1184
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1185 1186 1187 1188

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()

1189
	rpt := &releasePartitionsTask{
G
godchen 已提交
1190 1191 1192
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1193
		queryCoord:               node.queryCoord,
1194 1195
	}

1196
	method := "ReleasePartitions"
1197
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1198
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1199
		metrics.TotalLabel).Inc()
1200 1201

	log := log.Ctx(ctx).With(
1202
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1203 1204 1205
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1206

1207 1208
	log.Debug(rpcReceived(method))

1209 1210 1211
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1212
			zap.Error(err))
1213

E
Enwei Jiao 已提交
1214
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1215
			metrics.AbandonLabel).Inc()
1216

1217
		return &commonpb.Status{
1218
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1219 1220 1221 1222
			Reason:    err.Error(),
		}, nil
	}

1223 1224 1225
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1226
		zap.Uint64("EndTS", rpt.EndTs()))
1227 1228 1229 1230

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1231
			zap.Error(err),
1232
			zap.Uint64("BeginTS", rpt.BeginTs()),
1233
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1234

E
Enwei Jiao 已提交
1235
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1236
			metrics.FailLabel).Inc()
1237

1238
		return &commonpb.Status{
1239
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1240 1241 1242 1243
			Reason:    err.Error(),
		}, nil
	}

1244 1245 1246
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1247
		zap.Uint64("EndTS", rpt.EndTs()))
1248

E
Enwei Jiao 已提交
1249
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1250
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1251
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1252
	return rpt.result, nil
1253 1254
}

1255
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1256
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1257 1258 1259 1260 1261
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1262 1263 1264

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
1265 1266
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1267
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1268
		metrics.TotalLabel).Inc()
1269

1270
	g := &getPartitionStatisticsTask{
1271 1272 1273
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1274
		dataCoord:                     node.dataCoord,
1275 1276
	}

1277
	log := log.Ctx(ctx).With(
1278
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1279 1280 1281
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1282

1283 1284
	log.Debug(rpcReceived(method))

1285 1286 1287
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1288
			zap.Error(err))
1289

E
Enwei Jiao 已提交
1290
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1291
			metrics.AbandonLabel).Inc()
1292

1293 1294 1295 1296 1297 1298 1299 1300
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1301 1302 1303
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1304
		zap.Uint64("EndTS", g.EndTs()))
1305 1306 1307 1308

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1309
			zap.Error(err),
1310
			zap.Uint64("BeginTS", g.BeginTs()),
1311
			zap.Uint64("EndTS", g.EndTs()))
1312

E
Enwei Jiao 已提交
1313
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1314
			metrics.FailLabel).Inc()
1315

1316 1317 1318 1319 1320 1321 1322 1323
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1324 1325 1326
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1327
		zap.Uint64("EndTS", g.EndTs()))
1328

E
Enwei Jiao 已提交
1329
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1330
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1331
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1332
	return g.result, nil
1333 1334
}

1335
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1336
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1337 1338 1339 1340 1341
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1342 1343 1344 1345

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()

1346
	spt := &showPartitionsTask{
G
godchen 已提交
1347 1348 1349
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1350
		rootCoord:             node.rootCoord,
1351
		queryCoord:            node.queryCoord,
G
godchen 已提交
1352
		result:                nil,
1353 1354
	}

1355
	method := "ShowPartitions"
1356 1357
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1358
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1359
		metrics.TotalLabel).Inc()
1360

1361 1362
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1363 1364
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1365
		zap.Any("request", request))
1366 1367 1368 1369 1370 1371 1372

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1373
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1374
			metrics.AbandonLabel).Inc()
1375

G
godchen 已提交
1376
		return &milvuspb.ShowPartitionsResponse{
1377
			Status: &commonpb.Status{
1378
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1379 1380 1381 1382 1383
				Reason:    err.Error(),
			},
		}, nil
	}

1384 1385 1386 1387
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1388 1389
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1390 1391 1392 1393 1394
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1395
			zap.Error(err),
1396 1397 1398 1399 1400
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1401

E
Enwei Jiao 已提交
1402
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1403
			metrics.FailLabel).Inc()
1404

G
godchen 已提交
1405
		return &milvuspb.ShowPartitionsResponse{
1406
			Status: &commonpb.Status{
1407
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1408 1409 1410 1411
				Reason:    err.Error(),
			},
		}, nil
	}
1412 1413 1414 1415 1416 1417 1418 1419 1420

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1424 1425 1426
	return spt.result, nil
}

S
SimFG 已提交
1427 1428 1429 1430 1431 1432
func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
1433
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadingProgress")
S
SimFG 已提交
1434
	defer sp.Finish()
E
Enwei Jiao 已提交
1435
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1436 1437 1438
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1439 1440 1441 1442
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1443
		log.Warn("fail to get loading progress",
1444
			zap.String("collection_name", request.CollectionName),
S
SimFG 已提交
1445 1446
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
E
Enwei Jiao 已提交
1447
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
S
SimFG 已提交
1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
S
SimFG 已提交
1462 1463 1464 1465 1466 1467 1468

	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

1469 1470 1471
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1472
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1473
	)
S
SimFG 已提交
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
S
SimFG 已提交
1484
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
S
SimFG 已提交
1485 1486 1487
			return getErrResponse(err), nil
		}
	} else {
S
SimFG 已提交
1488 1489
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
S
SimFG 已提交
1490 1491 1492 1493
			return getErrResponse(err), nil
		}
	}

1494
	log.Debug(
S
SimFG 已提交
1495 1496
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1497 1498
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1499 1500 1501 1502 1503 1504 1505 1506
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1507
func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) {
S
SimFG 已提交
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
	if !node.checkHealthy() {
		return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadState"
	tr := timerecord.NewTimeRecorder(method)
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadState")
	defer sp.Finish()
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
	log := log.Ctx(ctx)

	log.Debug(
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadStateResponse {
		log.Warn("fail to get load state",
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
		return &milvuspb.GetLoadStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}

	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

	successResponse := &milvuspb.GetLoadStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	defer func() {
		log.Debug(
			rpcDone(method),
			zap.Any("request", request))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
		metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	}()

	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		successResponse.State = commonpb.LoadState_LoadStateNotExist
		return successResponse, nil
	}

	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	} else {
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	}
	if progress >= 100 {
		successResponse.State = commonpb.LoadState_LoadStateLoaded
	} else {
		successResponse.State = commonpb.LoadState_LoadStateLoading
	}
	return successResponse, nil
1597 1598
}

1599
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1600
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1601 1602 1603
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1604

1605
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateIndex")
D
dragondriver 已提交
1606 1607
	defer sp.Finish()

1608
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1609 1610 1611 1612 1613
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
		indexCoord: node.indexCoord,
1614
		queryCoord: node.queryCoord,
1615 1616
	}

D
dragondriver 已提交
1617
	method := "CreateIndex"
1618
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1619
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1620
		metrics.TotalLabel).Inc()
1621 1622

	log := log.Ctx(ctx).With(
1623
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1624 1625 1626 1627
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1628

1629 1630
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1631 1632 1633
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1634
			zap.Error(err))
D
dragondriver 已提交
1635

E
Enwei Jiao 已提交
1636
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1637
			metrics.AbandonLabel).Inc()
1638

1639
		return &commonpb.Status{
1640
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1641 1642 1643 1644
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1645 1646 1647
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1648
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1649 1650 1651 1652

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1653
			zap.Error(err),
D
dragondriver 已提交
1654
			zap.Uint64("BeginTs", cit.BeginTs()),
1655
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1656

E
Enwei Jiao 已提交
1657
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1658
			metrics.FailLabel).Inc()
1659

1660
		return &commonpb.Status{
1661
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1662 1663 1664 1665
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1666 1667 1668
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1669
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1670

E
Enwei Jiao 已提交
1671
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1672
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1673
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1674 1675 1676
	return cit.result, nil
}

1677
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1678
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1679 1680 1681 1682 1683
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1684 1685 1686 1687

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()

1688
	dit := &describeIndexTask{
S
sunby 已提交
1689
		ctx:                  ctx,
1690 1691
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1692
		indexCoord:           node.indexCoord,
1693 1694
	}

1695 1696
	method := "DescribeIndex"
	// avoid data race
1697
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1698
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1699
		metrics.TotalLabel).Inc()
1700 1701

	log := log.Ctx(ctx).With(
1702
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1703 1704 1705
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1706 1707 1708
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1709 1710 1711 1712

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1713
			zap.Error(err))
1714

E
Enwei Jiao 已提交
1715
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1716
			metrics.AbandonLabel).Inc()
1717

1718 1719
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1720
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1721 1722 1723 1724 1725
				Reason:    err.Error(),
			},
		}, nil
	}

1726 1727 1728
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1729
		zap.Uint64("EndTs", dit.EndTs()))
1730 1731 1732 1733

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1734
			zap.Error(err),
1735
			zap.Uint64("BeginTs", dit.BeginTs()),
1736
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1737

Z
zhenshan.cao 已提交
1738 1739 1740 1741
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1742
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1743
			metrics.FailLabel).Inc()
1744

1745 1746
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1747
				ErrorCode: errCode,
1748 1749 1750 1751 1752
				Reason:    err.Error(),
			},
		}, nil
	}

1753 1754 1755
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1756
		zap.Uint64("EndTs", dit.EndTs()))
1757

E
Enwei Jiao 已提交
1758
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1759
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1760
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1761 1762 1763
	return dit.result, nil
}

1764
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1765
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1766 1767 1768
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1769 1770 1771 1772

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()

1773
	dit := &dropIndexTask{
S
sunby 已提交
1774
		ctx:              ctx,
B
BossZou 已提交
1775 1776
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1777
		indexCoord:       node.indexCoord,
1778
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1779
	}
G
godchen 已提交
1780

D
dragondriver 已提交
1781
	method := "DropIndex"
1782
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1783
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1784
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1785

1786
	log := log.Ctx(ctx).With(
1787
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1788 1789 1790 1791 1792
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1793 1794
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1795 1796 1797
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1798
			zap.Error(err))
E
Enwei Jiao 已提交
1799
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1800
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1801

B
BossZou 已提交
1802
		return &commonpb.Status{
1803
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1804 1805 1806
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1807

D
dragondriver 已提交
1808 1809 1810
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1811
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1812 1813 1814 1815

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1816
			zap.Error(err),
D
dragondriver 已提交
1817
			zap.Uint64("BeginTs", dit.BeginTs()),
1818
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1819

E
Enwei Jiao 已提交
1820
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1821
			metrics.FailLabel).Inc()
1822

B
BossZou 已提交
1823
		return &commonpb.Status{
1824
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1825 1826 1827
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1828 1829 1830 1831

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1832
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1833

E
Enwei Jiao 已提交
1834
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1835
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1836
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1837 1838 1839
	return dit.result, nil
}

1840 1841
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1842
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1843
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1844 1845 1846 1847 1848
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1849 1850 1851 1852

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()

1853
	gibpt := &getIndexBuildProgressTask{
1854 1855 1856
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1857 1858
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1859
		dataCoord:                    node.dataCoord,
1860 1861
	}

1862
	method := "GetIndexBuildProgress"
1863
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1864
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1865
		metrics.TotalLabel).Inc()
1866 1867

	log := log.Ctx(ctx).With(
1868
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1869 1870 1871 1872
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1873

1874 1875
	log.Debug(rpcReceived(method))

1876 1877 1878
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1879
			zap.Error(err))
E
Enwei Jiao 已提交
1880
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1881
			metrics.AbandonLabel).Inc()
1882

1883 1884 1885 1886 1887 1888 1889 1890
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1891 1892 1893
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1894
		zap.Uint64("EndTs", gibpt.EndTs()))
1895 1896 1897 1898

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1899
			zap.Error(err),
1900
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1901
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1902
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1903
			metrics.FailLabel).Inc()
1904 1905 1906 1907 1908 1909 1910 1911

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1912 1913 1914 1915

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1916
		zap.Uint64("EndTs", gibpt.EndTs()))
1917

E
Enwei Jiao 已提交
1918
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1919
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1920
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1921
	return gibpt.result, nil
1922 1923
}

1924
// GetIndexState get the build-state of index.
1925
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1926
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1927 1928 1929 1930 1931
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1932 1933 1934 1935

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()

1936
	dipt := &getIndexStateTask{
G
godchen 已提交
1937 1938 1939
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1940 1941
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
1942 1943
	}

1944
	method := "GetIndexState"
1945
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1946
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1947
		metrics.TotalLabel).Inc()
1948 1949

	log := log.Ctx(ctx).With(
1950
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1951 1952 1953 1954
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1955

1956 1957
	log.Debug(rpcReceived(method))

1958 1959 1960
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1961
			zap.Error(err))
1962

E
Enwei Jiao 已提交
1963
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1964
			metrics.AbandonLabel).Inc()
1965

G
godchen 已提交
1966
		return &milvuspb.GetIndexStateResponse{
1967
			Status: &commonpb.Status{
1968
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1969 1970 1971 1972 1973
				Reason:    err.Error(),
			},
		}, nil
	}

1974 1975 1976
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1977
		zap.Uint64("EndTs", dipt.EndTs()))
1978 1979 1980 1981

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1982
			zap.Error(err),
1983
			zap.Uint64("BeginTs", dipt.BeginTs()),
1984
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
1985
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1986
			metrics.FailLabel).Inc()
1987

G
godchen 已提交
1988
		return &milvuspb.GetIndexStateResponse{
1989
			Status: &commonpb.Status{
1990
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1991 1992 1993 1994 1995
				Reason:    err.Error(),
			},
		}, nil
	}

1996 1997 1998
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1999
		zap.Uint64("EndTs", dipt.EndTs()))
2000

E
Enwei Jiao 已提交
2001
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2002
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2003
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2004 2005 2006
	return dipt.result, nil
}

2007
// Insert insert records into collection.
C
Cai Yudong 已提交
2008
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2009 2010
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
2011 2012 2013
	log := log.Ctx(ctx)
	log.Debug("Start processing insert request in Proxy")
	defer log.Debug("Finish processing insert request in Proxy")
X
Xiangyu Wang 已提交
2014

2015 2016 2017 2018 2019
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2020 2021
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
S
smellthemoon 已提交
2022
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(request)))
E
Enwei Jiao 已提交
2023
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
smellthemoon 已提交
2024

2025
	it := &insertTask{
2026 2027
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2028
		// req:       request,
2029
		insertMsg: &msgstream.InsertMsg{
2030 2031 2032
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2033
			InsertRequest: internalpb.InsertRequest{
2034 2035 2036
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2037
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2038
				),
2039 2040
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2041 2042 2043
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2044
				// RowData: transfer column based request to this
2045 2046
			},
		},
2047
		idAllocator:   node.rowIDAllocator,
2048 2049 2050
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2051
	}
2052

2053 2054
	if len(it.insertMsg.PartitionName) <= 0 {
		it.insertMsg.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
2055 2056
	}

X
Xiangyu Wang 已提交
2057
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2058
		numRows := request.NumRows
2059 2060 2061 2062
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2063

X
Xiangyu Wang 已提交
2064 2065 2066 2067 2068 2069 2070
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2071 2072
	}

X
Xiangyu Wang 已提交
2073
	log.Debug("Enqueue insert request in Proxy",
2074
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2075 2076 2077 2078 2079
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2080
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2081

X
Xiangyu Wang 已提交
2082
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2083
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2084
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2085
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2086
		return constructFailedResponse(err), nil
2087
	}
D
dragondriver 已提交
2088

X
Xiangyu Wang 已提交
2089
	log.Debug("Detail of insert request in Proxy",
2090
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2091 2092 2093 2094 2095
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2096
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2097 2098

	if err := it.WaitToFinish(); err != nil {
2099
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2100
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2101
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2102 2103 2104 2105 2106
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2107
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2119
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2120

S
smellthemoon 已提交
2121 2122 2123
	receiveSize := proto.Size(it.insertMsg)
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2124
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2125
		metrics.SuccessLabel).Inc()
2126
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2127 2128 2129
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2130 2131 2132
	return it.result, nil
}

2133
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2134
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2135 2136
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
2137 2138 2139
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2140

S
smellthemoon 已提交
2141
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(request)))
2142

G
groot 已提交
2143 2144 2145 2146 2147 2148
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2149 2150 2151
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2152
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2153
		metrics.TotalLabel).Inc()
2154
	dt := &deleteTask{
X
xige-16 已提交
2155 2156 2157
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
2158
		deleteMsg: &BaseDeleteTask{
G
godchen 已提交
2159 2160 2161
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2162
			DeleteRequest: internalpb.DeleteRequest{
2163 2164 2165 2166
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2167
				DbName:         request.DbName,
G
godchen 已提交
2168 2169 2170
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2171 2172 2173 2174
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2175 2176
	}

2177
	log.Debug("Enqueue delete request in Proxy",
2178
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2179 2180 2181 2182
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2183 2184 2185

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2186
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2187
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2188
			metrics.AbandonLabel).Inc()
2189

G
groot 已提交
2190 2191 2192 2193 2194 2195 2196 2197
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2198
	log.Debug("Detail of delete request in Proxy",
2199
		zap.String("role", typeutil.ProxyRole),
2200
		zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp),
G
groot 已提交
2201 2202 2203
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2204
		zap.String("expr", request.Expr))
G
groot 已提交
2205

2206
	if err := dt.WaitToFinish(); err != nil {
2207
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2208
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2209
			metrics.FailLabel).Inc()
G
groot 已提交
2210 2211 2212 2213 2214 2215 2216 2217
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

S
smellthemoon 已提交
2218 2219 2220
	receiveSize := proto.Size(dt.deleteMsg)
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2221
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2222
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2223 2224
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2225 2226 2227
	return dt.result, nil
}

S
smellthemoon 已提交
2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Upsert")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Uint32("NumRows", request.NumRows),
	)
	log.Debug("Start processing upsert request in Proxy")

	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
	method := "Upsert"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Add(float64(proto.Size(request)))
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()

	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),

		req: &milvuspb.UpsertRequest{
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType(commonpb.MsgType_Upsert)),
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
			),
			CollectionName: request.CollectionName,
			PartitionName:  request.PartitionName,
			FieldsData:     request.FieldsData,
			NumRows:        request.NumRows,
		},

		result: &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	if len(it.req.PartitionName) <= 0 {
		it.req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
	}

	constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
		numRows := request.NumRows
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}

		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: errCode,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
	}

	log.Debug("Enqueue upsert request in Proxy",
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)))

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Info("Failed to enqueue upsert task",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("Detail of upsert request in Proxy",
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()))

	if err := it.WaitToFinish(); err != nil {
		log.Info("Failed to execute insert task in task scheduler",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
		return constructFailedResponse(err, it.result.Status.ErrorCode), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
			numRows := request.NumRows
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}
		setErrorIndex()
	}

	insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg)
	deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg)

	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(deleteReceiveSize))
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(insertReceiveSize))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Debug("Finish processing upsert request in Proxy")
	return it.result, nil
}

2362
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2363
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2364
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2365
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2366 2367 2368

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2369 2370 2371 2372 2373
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2374 2375
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2376
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2377
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2378

C
cai.zhang 已提交
2379 2380
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2381

2382
	qt := &searchTask{
S
sunby 已提交
2383
		ctx:       ctx,
2384
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2385
		SearchRequest: &internalpb.SearchRequest{
2386 2387
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2388
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2389
			),
E
Enwei Jiao 已提交
2390
			ReqID: paramtable.GetNodeID(),
2391
		},
2392 2393 2394 2395
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2396 2397
	}

2398 2399 2400
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2401
	log := log.Ctx(ctx).With(
2402
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2403 2404 2405 2406 2407
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2408 2409 2410 2411
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2412

2413 2414 2415
	log.Debug(
		rpcReceived(method))

2416
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2417
		log.Warn(
2418
			rpcFailedToEnqueue(method),
2419
			zap.Error(err))
D
dragondriver 已提交
2420

E
Enwei Jiao 已提交
2421
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2422
			metrics.AbandonLabel).Inc()
2423

2424 2425
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2426
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2427 2428 2429 2430
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2431
	tr.CtxRecord(ctx, "search request enqueue")
2432

2433
	log.Debug(
2434
		rpcEnqueued(method),
2435
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2436

2437
	if err := qt.WaitToFinish(); err != nil {
2438
		log.Warn(
2439
			rpcFailedToWaitToFinish(method),
2440
			zap.Error(err))
2441

E
Enwei Jiao 已提交
2442
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2443
			metrics.FailLabel).Inc()
2444

2445 2446
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2447
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2448 2449 2450 2451 2452
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2453
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2454
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2455
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2456
	tr.CtxRecord(ctx, "wait search result")
2457
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2458

E
Enwei Jiao 已提交
2459
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2460
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2461
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2462
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2463
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2464
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2465
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2466
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2467 2468
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2469
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2470
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2471
	}
2472 2473 2474
	return qt.result, nil
}

2475
// Flush notify data nodes to persist the data of collection.
2476 2477 2478 2479 2480 2481 2482
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2483
	if !node.checkHealthy() {
2484 2485
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2486
	}
D
dragondriver 已提交
2487 2488 2489 2490

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()

2491
	ft := &flushTask{
T
ThreadDao 已提交
2492 2493 2494
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2495
		dataCoord:    node.dataCoord,
2496 2497
	}

D
dragondriver 已提交
2498
	method := "Flush"
2499
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2500
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2501

2502
	log := log.Ctx(ctx).With(
2503
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2504 2505
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2506

2507 2508
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2509 2510 2511
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2512
			zap.Error(err))
D
dragondriver 已提交
2513

E
Enwei Jiao 已提交
2514
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2515

2516 2517
		resp.Status.Reason = err.Error()
		return resp, nil
2518 2519
	}

D
dragondriver 已提交
2520 2521 2522
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2523
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2524 2525 2526 2527

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2528
			zap.Error(err),
D
dragondriver 已提交
2529
			zap.Uint64("BeginTs", ft.BeginTs()),
2530
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2531

E
Enwei Jiao 已提交
2532
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2533

D
dragondriver 已提交
2534
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2535 2536
		resp.Status.Reason = err.Error()
		return resp, nil
2537 2538
	}

D
dragondriver 已提交
2539 2540 2541
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2542
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2543

E
Enwei Jiao 已提交
2544 2545
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2546
	return ft.result, nil
2547 2548
}

2549
// Query get the records by primary keys.
C
Cai Yudong 已提交
2550
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2551
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2552
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2553 2554 2555

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2556 2557 2558 2559 2560
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2561

D
dragondriver 已提交
2562 2563
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
2564
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2565

2566
	qt := &queryTask{
2567 2568 2569
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2570 2571
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2572
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2573
			),
E
Enwei Jiao 已提交
2574
			ReqID: paramtable.GetNodeID(),
2575
		},
2576 2577
		request:          request,
		qc:               node.queryCoord,
2578
		queryShardPolicy: mergeRoundRobinPolicy,
2579
		shardMgr:         node.shardMgr,
2580 2581
	}

D
dragondriver 已提交
2582 2583
	method := "Query"

E
Enwei Jiao 已提交
2584
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2585 2586
		metrics.TotalLabel).Inc()

2587
	log := log.Ctx(ctx).With(
2588
		zap.String("role", typeutil.ProxyRole),
2589 2590
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2591 2592 2593 2594
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2595 2596 2597 2598
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2599

D
dragondriver 已提交
2600
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2601
		log.Warn(
D
dragondriver 已提交
2602
			rpcFailedToEnqueue(method),
2603
			zap.Error(err))
D
dragondriver 已提交
2604

E
Enwei Jiao 已提交
2605
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2606
			metrics.AbandonLabel).Inc()
2607

2608 2609 2610 2611 2612 2613
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2614
	}
Z
Zach 已提交
2615
	tr.CtxRecord(ctx, "query request enqueue")
2616

2617
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2618 2619

	if err := qt.WaitToFinish(); err != nil {
2620
		log.Warn(
D
dragondriver 已提交
2621
			rpcFailedToWaitToFinish(method),
2622
			zap.Error(err))
2623

E
Enwei Jiao 已提交
2624
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2625
			metrics.FailLabel).Inc()
2626

2627 2628 2629 2630 2631 2632 2633
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2634
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2635
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2636
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2637

2638
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2639

E
Enwei Jiao 已提交
2640
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2641 2642
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2643
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2644
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2645
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2646
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2647 2648

	ret := &milvuspb.QueryResults{
2649 2650
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2651 2652
	}
	sentSize := proto.Size(qt.result)
2653
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2654
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2655
	return ret, nil
2656
}
2657

2658
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2659 2660 2661 2662
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2663 2664 2665 2666

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()

Y
Yusup 已提交
2667 2668 2669 2670 2671 2672 2673
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2674
	method := "CreateAlias"
2675
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2676
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2677

2678
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2679 2680 2681 2682 2683
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2684 2685
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2686 2687 2688
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2689
			zap.Error(err))
D
dragondriver 已提交
2690

E
Enwei Jiao 已提交
2691
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2692

Y
Yusup 已提交
2693 2694 2695 2696 2697 2698
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2699 2700 2701
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2702
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2703 2704 2705 2706

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2707
			zap.Error(err),
D
dragondriver 已提交
2708
			zap.Uint64("BeginTs", cat.BeginTs()),
2709
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2710
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2711 2712 2713 2714 2715 2716 2717

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2718 2719 2720
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2721
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2722

E
Enwei Jiao 已提交
2723 2724
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2725 2726 2727
	return cat.result, nil
}

2728
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2729 2730 2731 2732
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2733 2734 2735 2736

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()

Y
Yusup 已提交
2737 2738 2739 2740 2741 2742 2743
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2744
	method := "DropAlias"
2745
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2746
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2747

2748
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2749 2750 2751 2752
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2753 2754
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2755 2756 2757
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2758
			zap.Error(err))
E
Enwei Jiao 已提交
2759
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2760

Y
Yusup 已提交
2761 2762 2763 2764 2765 2766
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2767 2768 2769
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2770
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2771 2772 2773 2774

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2775
			zap.Error(err),
D
dragondriver 已提交
2776
			zap.Uint64("BeginTs", dat.BeginTs()),
2777
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2778

E
Enwei Jiao 已提交
2779
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2780

Y
Yusup 已提交
2781 2782 2783 2784 2785 2786
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2787 2788 2789
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2790
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2791

E
Enwei Jiao 已提交
2792 2793
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2794 2795 2796
	return dat.result, nil
}

2797
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2798 2799 2800 2801
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2802 2803 2804 2805

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()

Y
Yusup 已提交
2806 2807 2808 2809 2810 2811 2812
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2813
	method := "AlterAlias"
2814
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2815
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2816

2817
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2818 2819 2820 2821 2822
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2823 2824
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2825 2826 2827
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2828
			zap.Error(err))
E
Enwei Jiao 已提交
2829
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2830

Y
Yusup 已提交
2831 2832 2833 2834 2835 2836
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2837 2838 2839
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2840
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2841 2842 2843 2844

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2845
			zap.Error(err),
D
dragondriver 已提交
2846
			zap.Uint64("BeginTs", aat.BeginTs()),
2847
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2848

E
Enwei Jiao 已提交
2849
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2850

Y
Yusup 已提交
2851 2852 2853 2854 2855 2856
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2857 2858 2859
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2860
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2861

E
Enwei Jiao 已提交
2862 2863
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2864 2865 2866
	return aat.result, nil
}

2867
// CalcDistance calculates the distances between vectors.
2868
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2869 2870 2871 2872 2873
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2874

2875 2876 2877 2878
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2879 2880
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2881

2882 2883 2884 2885 2886
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2887 2888
		}

2889
		qt := &queryTask{
2890 2891 2892
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2893 2894
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2895
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2896
				),
E
Enwei Jiao 已提交
2897
				ReqID: paramtable.GetNodeID(),
2898
			},
2899 2900 2901 2902
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2903
			queryShardPolicy: mergeRoundRobinPolicy,
2904
			shardMgr:         node.shardMgr,
2905 2906
		}

2907
		log := log.Ctx(ctx).With(
G
groot 已提交
2908 2909
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2910
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2911

2912
		err := node.sched.dqQueue.Enqueue(qt)
2913
		if err != nil {
2914 2915
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2916

2917 2918 2919 2920 2921
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2922
			}, err
2923
		}
2924

2925
		log.Debug("CalcDistance queryTask enqueued")
2926 2927 2928

		err = qt.WaitToFinish()
		if err != nil {
2929 2930
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2931 2932 2933 2934 2935 2936

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2937
			}, err
2938
		}
2939

2940
		log.Debug("CalcDistance queryTask Done")
2941 2942

		return &milvuspb.QueryResults{
2943 2944
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2945 2946 2947
		}, nil
	}

G
groot 已提交
2948 2949 2950 2951
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
2952 2953
	}

G
groot 已提交
2954
	return task.Execute(ctx, request)
2955 2956
}

2957
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2958
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2959 2960
	panic("implement me")
}
X
XuanYang-cn 已提交
2961

2962
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2963
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
2964 2965 2966 2967 2968
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
2969
	log.Debug("GetPersistentSegmentInfo",
2970
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2971 2972 2973
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2974
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2975
		Status: &commonpb.Status{
2976
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2977 2978
		},
	}
2979 2980 2981 2982
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
2983 2984
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2985
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2986
		metrics.TotalLabel).Inc()
2987 2988 2989

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
2990
	if err != nil {
E
Enwei Jiao 已提交
2991
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
3003
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3004
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3005 3006
		return resp, nil
	}
3007 3008

	// get Segment info
3009
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3010 3011 3012
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3013
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3014
		),
3015
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3016 3017
	})
	if err != nil {
E
Enwei Jiao 已提交
3018
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3019
			metrics.FailLabel).Inc()
3020 3021
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
3022
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3023 3024
		return resp, nil
	}
3025 3026 3027
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
3028
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3029
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3030
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3031 3032 3033 3034 3035 3036
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3037
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3038 3039
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3040
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3041 3042 3043
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3044
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3045
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3046
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3047
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3048 3049 3050 3051
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3052
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3053
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
3054 3055 3056 3057 3058
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
3059
	log.Debug("GetQuerySegmentInfo",
3060
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3061 3062 3063
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3064
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3065
		Status: &commonpb.Status{
3066
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3067 3068
		},
	}
3069 3070 3071 3072
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3073

3074 3075
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3076
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3077 3078
		metrics.TotalLabel).Inc()

3079 3080
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3081
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3082 3083 3084
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3085
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3086 3087 3088
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3089
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3090
		),
3091
		CollectionID: collID,
Z
zhenshan.cao 已提交
3092 3093
	})
	if err != nil {
E
Enwei Jiao 已提交
3094
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3095 3096
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
3097 3098 3099
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3100 3101 3102
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
3103
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3104
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3105 3106
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3120
			State:        info.SegmentState,
3121
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3122 3123
		}
	}
3124

E
Enwei Jiao 已提交
3125 3126
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3127
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3128 3129 3130 3131
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3132
// Dummy handles dummy request
C
Cai Yudong 已提交
3133
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3134 3135 3136 3137 3138 3139
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
3140 3141 3142 3143 3144 3145

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Dummy")
	defer sp.Finish()

	log := log.Ctx(ctx)

3146
	if err != nil {
3147 3148
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
3149 3150 3151
		return failedResponse, nil
	}

3152 3153
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3154
		if err != nil {
3155 3156
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
3157 3158 3159
			return failedResponse, nil
		}

3160
		request := &milvuspb.QueryRequest{
3161 3162 3163
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3164
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3165 3166
		}

3167
		_, err = node.Query(ctx, request)
3168
		if err != nil {
3169 3170
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
3171 3172
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3173 3174 3175 3176 3177 3178

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3179 3180
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3181 3182
}

J
jingkl 已提交
3183
// RegisterLink registers a link
C
Cai Yudong 已提交
3184
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3185
	code := node.stateCode.Load().(commonpb.StateCode)
3186 3187 3188 3189 3190

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RegisterLink")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3191
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3192
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3193

3194 3195
	log.Debug("RegisterLink")

3196
	if code != commonpb.StateCode_Healthy {
3197 3198 3199
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3200
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3201
				Reason:    "proxy not healthy",
3202 3203 3204
			},
		}, nil
	}
E
Enwei Jiao 已提交
3205
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3206 3207 3208
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3209
			ErrorCode: commonpb.ErrorCode_Success,
3210
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3211 3212 3213
		},
	}, nil
}
3214

3215
// GetMetrics gets the metrics of proxy
3216 3217
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3218 3219 3220 3221 3222
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx)

3223
	log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3224
		zap.Int64("node_id", paramtable.GetNodeID()),
3225 3226 3227 3228
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
E
Enwei Jiao 已提交
3229
			zap.Int64("node_id", paramtable.GetNodeID()),
3230
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3231
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3232 3233 3234 3235

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3236
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3237 3238 3239 3240 3241 3242 3243 3244
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
E
Enwei Jiao 已提交
3245
			zap.Int64("node_id", paramtable.GetNodeID()),
3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

3261 3262 3263
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3264
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3265
	)
3266
	if metricType == metricsinfo.SystemInfoMetrics {
3267 3268 3269 3270 3271 3272 3273
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3274
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3275 3276

		log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3277
			zap.Int64("node_id", paramtable.GetNodeID()),
3278 3279 3280 3281 3282
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3283 3284
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3285
		return metrics, nil
3286 3287
	}

J
Jiquan Long 已提交
3288
	log.Warn("Proxy.GetMetrics failed, request metric type is not implemented yet",
E
Enwei Jiao 已提交
3289
		zap.Int64("node_id", paramtable.GetNodeID()),
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3302 3303 3304
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3305 3306 3307 3308 3309 3310 3311
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetProxyMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("node_id", paramtable.GetNodeID()),
		zap.String("req", req.Request))

3312 3313
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3314
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3315 3316 3317 3318

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3319
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3337 3338 3339
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3340
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3341
	)
3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		log.Debug("Proxy.GetProxyMetrics",
3358
			zap.String("metric_type", metricType))
3359 3360 3361 3362

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3363
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3364 3365 3366 3367 3368 3369 3370 3371 3372 3373
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3374 3375
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
3376 3377 3378 3379 3380
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadBalance")
	defer sp.Finish()

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3381
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3382
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3383 3384 3385 3386 3387 3388 3389 3390 3391
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3392 3393 3394

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3395
		log.Warn("failed to get collection id",
3396 3397
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3398 3399 3400
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3401
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3402 3403 3404
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3405
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3406
		),
B
bigsheeper 已提交
3407 3408
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3409
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3410
		SealedSegmentIDs: req.SealedSegmentIDs,
3411
		CollectionID:     collectionID,
B
bigsheeper 已提交
3412 3413
	})
	if err != nil {
J
Jiquan Long 已提交
3414
		log.Warn("Failed to LoadBalance from Query Coordinator",
3415 3416
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3417 3418 3419 3420
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3421
		log.Warn("Failed to LoadBalance from Query Coordinator",
3422
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3423 3424 3425
		status.Reason = infoResp.Reason
		return status, nil
	}
3426 3427 3428
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3429 3430 3431 3432
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3433 3434
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
3435 3436 3437 3438 3439 3440 3441 3442
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetReplicas")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3443 3444 3445 3446 3447 3448
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3449 3450
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3451
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3452
	)
3453 3454 3455

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3456 3457
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3458 3459 3460 3461
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3462 3463 3464
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3465 3466 3467
	return resp, nil
}

3468
// GetCompactionState gets the compaction state of multiple segments
3469
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
3470 3471 3472 3473 3474 3475 3476
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionState")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3477 3478 3479 3480 3481 3482 3483
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3484 3485 3486
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3487 3488 3489
	return resp, err
}

3490
// ManualCompaction invokes compaction on specified collection
3491
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
3492 3493 3494 3495 3496 3497 3498
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ManualCompaction")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3499 3500 3501 3502 3503 3504 3505
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3506 3507 3508
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3509 3510 3511
	return resp, err
}

3512
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3513
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
3514 3515 3516 3517 3518 3519 3520
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3521 3522 3523 3524 3525 3526 3527
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3528 3529 3530
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3531 3532 3533
	return resp, err
}

B
Bingyi Sun 已提交
3534 3535
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
3536 3537 3538 3539 3540 3541 3542
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetFlushState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3543
	var err error
B
Bingyi Sun 已提交
3544 3545 3546
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3547
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3548 3549 3550
		return resp, nil
	}

3551
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3552
	if err != nil {
3553 3554
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3555 3556
		return nil, err
	}
3557 3558
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3559 3560 3561
	return resp, err
}

C
Cai Yudong 已提交
3562 3563
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3564 3565
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3566 3567
}

3568 3569 3570
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3571 3572
}

3573
// unhealthyStatus returns the proxy not healthy status
3574 3575 3576
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3577
		Reason:    "proxy not healthy",
3578 3579
	}
}
G
groot 已提交
3580 3581 3582

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3583 3584 3585 3586 3587
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Import")
	defer sp.Finish()

	log := log.Ctx(ctx)

3588 3589
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3590 3591
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3592 3593 3594 3595 3596 3597
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3598 3599 3600 3601
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3602

3603 3604
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3605 3606
		log.Error("failed to execute import request",
			zap.Error(err))
3607 3608 3609 3610 3611
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3612 3613
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3614
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3615 3616
		metrics.TotalLabel).Inc()

3617
	// Call rootCoord to finish import.
3618 3619
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3620
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3621 3622
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3623 3624 3625 3626
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3627

E
Enwei Jiao 已提交
3628 3629
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3630
	return respFromRC, nil
G
groot 已提交
3631 3632
}

3633
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3634
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
3635 3636 3637 3638 3639 3640 3641
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetImportState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3642 3643 3644 3645 3646
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3647 3648
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3649
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3650
		metrics.TotalLabel).Inc()
G
groot 已提交
3651 3652

	resp, err := node.rootCoord.GetImportState(ctx, req)
3653
	if err != nil {
E
Enwei Jiao 已提交
3654
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3655 3656
		log.Error("failed to execute get import state",
			zap.Error(err))
3657 3658 3659 3660 3661
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3662 3663 3664
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3665 3666
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3667
	return resp, nil
G
groot 已提交
3668 3669 3670 3671
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
3672 3673 3674 3675 3676
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListImportTasks")
	defer sp.Finish()

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3677
	log.Debug("received list import tasks request")
G
groot 已提交
3678 3679 3680 3681 3682
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3683 3684
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3685
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3686
		metrics.TotalLabel).Inc()
G
groot 已提交
3687
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3688
	if err != nil {
E
Enwei Jiao 已提交
3689
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3690 3691
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3692 3693
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3694 3695 3696
		return resp, nil
	}

3697 3698 3699
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3700 3701
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3702 3703 3704
	return resp, err
}

3705 3706 3707
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3708 3709 3710 3711 3712

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3713 3714
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3715 3716

	log.Debug("received request to invalidate credential cache")
3717
	if !node.checkHealthy() {
3718
		return unhealthyStatus(), nil
3719
	}
3720 3721 3722 3723 3724

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3725
	log.Debug("complete to invalidate credential cache")
3726 3727 3728 3729 3730 3731 3732 3733 3734 3735

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3736 3737 3738 3739 3740

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3741 3742
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3743 3744

	log.Debug("received request to update credential cache")
3745
	if !node.checkHealthy() {
3746
		return unhealthyStatus(), nil
3747
	}
3748 3749

	credInfo := &internalpb.CredentialInfo{
3750 3751
		Username:       request.Username,
		Sha256Password: request.Password,
3752 3753 3754 3755
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3756
	log.Debug("complete to update credential cache")
3757 3758 3759 3760 3761 3762 3763 3764

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
3765 3766 3767 3768 3769 3770 3771 3772
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3773
	if !node.checkHealthy() {
3774
		return unhealthyStatus(), nil
3775
	}
3776 3777 3778 3779 3780 3781 3782 3783 3784 3785
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3786 3787
		log.Error("decode password fail",
			zap.Error(err))
3788 3789 3790 3791 3792 3793
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3794 3795
		log.Error("illegal password",
			zap.Error(err))
3796 3797 3798 3799 3800 3801 3802
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3803 3804
		log.Error("encrypt password fail",
			zap.Error(err))
3805 3806 3807 3808 3809
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3810

3811 3812 3813
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3814
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3815 3816 3817
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3818 3819
		log.Error("create credential fail",
			zap.Error(err))
3820 3821 3822 3823 3824 3825 3826 3827
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3828
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
3829 3830 3831 3832 3833 3834 3835 3836
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3837
	if !node.checkHealthy() {
3838
		return unhealthyStatus(), nil
3839
	}
C
codeman 已提交
3840 3841
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3842 3843
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3844 3845 3846 3847 3848 3849
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3850
	if err != nil {
3851 3852
		log.Error("decode password fail",
			zap.Error(err))
3853 3854 3855 3856 3857
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3858 3859
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3860 3861
		log.Error("illegal password",
			zap.Error(err))
3862 3863 3864 3865 3866
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3867 3868

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3869 3870 3871 3872 3873 3874 3875
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3876
	if err != nil {
3877 3878
		log.Error("encrypt password fail",
			zap.Error(err))
3879 3880 3881 3882 3883
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3884
	updateCredReq := &internalpb.CredentialInfo{
3885
		Username:          req.Username,
3886
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3887 3888
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3889
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3890
	if err != nil { // for error like conntext timeout etc.
3891 3892
		log.Error("update credential fail",
			zap.Error(err))
3893 3894 3895 3896 3897 3898 3899 3900 3901
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
3902 3903 3904 3905 3906 3907 3908 3909
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DeleteCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3910
	if !node.checkHealthy() {
3911
		return unhealthyStatus(), nil
3912 3913
	}

3914 3915 3916 3917 3918 3919
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3920 3921
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3922 3923
		log.Error("delete credential fail",
			zap.Error(err))
3924 3925 3926 3927 3928 3929 3930 3931 3932
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
3933 3934 3935 3936 3937 3938 3939
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListCredUsers")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3940
	if !node.checkHealthy() {
3941
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3942
	}
3943
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3944 3945 3946
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3947 3948
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3961
		Usernames: resp.Usernames,
3962 3963
	}, nil
}
3964

3965
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
3966 3967 3968 3969 3970 3971 3972
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3973
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3974
		return errorutil.UnhealthyStatus(code), nil
3975 3976 3977 3978 3979 3980 3981 3982 3983 3984
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3985
		}, nil
3986 3987 3988 3989
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
3990 3991
		log.Error("fail to create role",
			zap.Error(err))
3992 3993 3994
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3995
		}, nil
3996 3997
	}
	return result, nil
3998 3999
}

4000
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
4001 4002 4003 4004 4005 4006 4007
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
4008
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4009
		return errorutil.UnhealthyStatus(code), nil
4010 4011 4012 4013 4014
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4015
		}, nil
4016
	}
4017 4018 4019 4020 4021
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4022
		}, nil
4023
	}
4024 4025
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
4026 4027 4028
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
4029 4030 4031
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4032
		}, nil
4033 4034
	}
	return result, nil
4035 4036
}

4037
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
4038 4039 4040 4041 4042 4043 4044
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperateUserRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
4045
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4046
		return errorutil.UnhealthyStatus(code), nil
4047 4048 4049 4050 4051
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4052
		}, nil
4053 4054 4055 4056 4057
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4058
		}, nil
4059 4060 4061 4062
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
4063 4064
		logger.Error("fail to operate user role",
			zap.Error(err))
4065 4066 4067
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4068
		}, nil
4069 4070
	}
	return result, nil
4071 4072
}

4073
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
4074 4075 4076 4077 4078 4079
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
4080
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4081
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4082 4083 4084 4085 4086 4087 4088 4089 4090
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4091
			}, nil
4092 4093 4094 4095 4096
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
4097 4098
		log.Error("fail to select role",
			zap.Error(err))
4099 4100 4101 4102 4103
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4104
		}, nil
4105 4106
	}
	return result, nil
4107 4108
}

4109
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
4110 4111 4112 4113 4114 4115 4116
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectUser")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
4117
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4118
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4119 4120 4121 4122 4123 4124 4125 4126 4127
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4128
			}, nil
4129 4130 4131 4132 4133
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
4134 4135
		log.Error("fail to select user",
			zap.Error(err))
4136 4137 4138 4139 4140
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4141
		}, nil
4142 4143
	}
	return result, nil
4144 4145
}

4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4176 4177
}

4178
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
4179 4180 4181 4182 4183 4184 4185
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperatePrivilege")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4186
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4187
		return errorutil.UnhealthyStatus(code), nil
4188 4189 4190 4191 4192
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4193
		}, nil
4194 4195 4196 4197 4198 4199
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4200
		}, nil
4201 4202 4203 4204
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4205 4206
		log.Error("fail to operate privilege",
			zap.Error(err))
4207 4208 4209
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4210
		}, nil
4211 4212
	}
	return result, nil
4213 4214
}

4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
4242 4243 4244 4245 4246 4247 4248
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectGrant")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4249
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4250
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4251 4252 4253 4254 4255 4256 4257 4258
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4259
		}, nil
4260 4261 4262 4263
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4264 4265
		log.Error("fail to select grant",
			zap.Error(err))
4266 4267 4268 4269 4270
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4271
		}, nil
4272 4273 4274 4275 4276
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
4277 4278 4279 4280 4281 4282 4283
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4284 4285 4286 4287 4288 4289 4290 4291 4292 4293
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4294 4295
			log.Error("fail to refresh policy info",
				zap.Error(err))
4296 4297 4298 4299 4300 4301
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4302
	log.Debug("RefreshPrivilegeInfoCache success")
4303 4304 4305 4306

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4307
}
4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
4325 4326 4327 4328 4329 4330 4331
	node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons())
	log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates()))
	if len(request.GetStates()) != 0 {
		for i := range request.GetStates() {
			log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i]))
		}
	}
4332 4333 4334
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4335 4336 4337 4338

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4339 4340 4341 4342
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4343 4344 4345 4346 4347 4348 4349 4350 4351 4352
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

4353 4354 4355 4356 4357
		sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.Finish()

		log := log.Ctx(ctx).With(zap.String("role", role))

4358
		if err != nil {
4359 4360
			log.Warn("check health fail",
				zap.Error(err))
4361 4362 4363 4364 4365
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4366
			log.Warn("check health fail")
4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.indexCoord.CheckHealth(ctx, request)
		return fn("indexcoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
4395 4396 4397
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
4398 4399 4400 4401 4402
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4403
	states, reasons := node.multiRateLimiter.GetQuotaStates()
4404 4405 4406 4407 4408
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
4409 4410 4411
		QuotaStates: states,
		Reasons:     reasons,
		IsHealthy:   true,
4412
	}, nil
4413
}