impl.go 141.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"fmt"
C
cai.zhang 已提交
22
	"os"
23
	"strconv"
24 25 26
	"sync"

	"golang.org/x/sync/errgroup"
27

28
	"github.com/golang/protobuf/proto"
S
SimFG 已提交
29 30
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
S
smellthemoon 已提交
31
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
32
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/log"
34
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
35
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
36 37 38 39
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
40
	"github.com/milvus-io/milvus/internal/util"
41
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43
	"github.com/milvus-io/milvus/internal/util/errorutil"
44
	"github.com/milvus-io/milvus/internal/util/importutil"
45 46
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
47
	"github.com/milvus-io/milvus/internal/util/paramtable"
48
	"github.com/milvus-io/milvus/internal/util/timerecord"
49
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
50
	"github.com/milvus-io/milvus/internal/util/typeutil"
51
	"go.uber.org/zap"
52 53
)

54 55
const moduleName = "Proxy"

56
// UpdateStateCode updates the state code of Proxy.
57
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
58
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
59 60
}

61
// GetComponentStates get state of Proxy.
62 63
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
64 65 66 67
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
68
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
69 70 71 72 73 74
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
75
		return stats, nil
G
godchen 已提交
76
	}
77 78 79 80
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
81
	info := &milvuspb.ComponentInfo{
82 83
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
84
		Role:      typeutil.ProxyRole,
G
godchen 已提交
85 86 87 88 89 90
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
91
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
92
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
93 94 95 96 97 98 99 100 101
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

102
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
103
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
104 105 106
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
107
	ctx = logutil.WithModule(ctx, moduleName)
108 109 110
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.Finish()
	log := log.Ctx(ctx).With(
111
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
112
		zap.String("db", request.DbName),
113 114
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
115

116 117
	log.Info("received request to invalidate collection meta cache")

118
	collectionName := request.CollectionName
119
	collectionID := request.CollectionID
X
Xiaofan 已提交
120 121

	var aliasName []string
N
neza2017 已提交
122
	if globalMetaCache != nil {
123 124 125 126
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
127
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
128
		}
N
neza2017 已提交
129
	}
130 131
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
132 133
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
134
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
135
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
136
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
137
		}
138
	}
139
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
140

141
	return &commonpb.Status{
142
		ErrorCode: commonpb.ErrorCode_Success,
143 144
		Reason:    "",
	}, nil
145 146
}

147
// CreateCollection create a collection by the schema.
148
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
149
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
150 151 152
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
153 154 155

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
156 157 158
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
159
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
160

161
	cct := &createCollectionTask{
S
sunby 已提交
162
		ctx:                     ctx,
163 164
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
165
		rootCoord:               node.rootCoord,
166 167
	}

168 169 170
	// avoid data race
	lenOfSchema := len(request.Schema)

171
	log := log.Ctx(ctx).With(
172
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
173 174
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
175
		zap.Int("len(schema)", lenOfSchema),
176 177
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
178

179 180
	log.Debug(rpcReceived(method))

181 182 183
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
184
			zap.Error(err))
185

E
Enwei Jiao 已提交
186
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
187
		return &commonpb.Status{
188
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
189 190 191 192
			Reason:    err.Error(),
		}, nil
	}

193 194
	log.Debug(
		rpcEnqueued(method),
195 196
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
197
		zap.Uint64("timestamp", request.Base.Timestamp))
198

199 200 201
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
202
			zap.Error(err),
203
			zap.Uint64("BeginTs", cct.BeginTs()),
204
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
205

E
Enwei Jiao 已提交
206
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
207
		return &commonpb.Status{
208
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
209 210 211 212
			Reason:    err.Error(),
		}, nil
	}

213 214
	log.Debug(
		rpcDone(method),
215
		zap.Uint64("BeginTs", cct.BeginTs()),
216
		zap.Uint64("EndTs", cct.EndTs()))
217

E
Enwei Jiao 已提交
218 219
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
220 221 222
	return cct.result, nil
}

223
// DropCollection drop a collection.
C
Cai Yudong 已提交
224
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
225 226 227
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
228 229 230

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
231 232
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
233
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
234

235
	dct := &dropCollectionTask{
S
sunby 已提交
236
		ctx:                   ctx,
237 238
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
239
		rootCoord:             node.rootCoord,
240
		chMgr:                 node.chMgr,
S
sunby 已提交
241
		chTicker:              node.chTicker,
242 243
	}

244
	log := log.Ctx(ctx).With(
245
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
246 247
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
248

249 250
	log.Debug("DropCollection received")

251 252
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
253
			zap.Error(err))
254

E
Enwei Jiao 已提交
255
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
256
		return &commonpb.Status{
257
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
258 259 260 261
			Reason:    err.Error(),
		}, nil
	}

262 263
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
264
		zap.Uint64("EndTs", dct.EndTs()))
265 266 267

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
268
			zap.Error(err),
269
			zap.Uint64("BeginTs", dct.BeginTs()),
270
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
271

E
Enwei Jiao 已提交
272
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
273
		return &commonpb.Status{
274
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
275 276 277 278
			Reason:    err.Error(),
		}, nil
	}

279 280
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
281
		zap.Uint64("EndTs", dct.EndTs()))
282

E
Enwei Jiao 已提交
283 284
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
285 286 287
	return dct.result, nil
}

288
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
289
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
290 291 292 293 294
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
295 296 297

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
298 299
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
300
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
301
		metrics.TotalLabel).Inc()
302

303
	log := log.Ctx(ctx).With(
304
		zap.String("role", typeutil.ProxyRole),
305 306 307
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

308 309
	log.Debug("HasCollection received")

310
	hct := &hasCollectionTask{
S
sunby 已提交
311
		ctx:                  ctx,
312 313
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
314
		rootCoord:            node.rootCoord,
315 316
	}

317 318
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
319
			zap.Error(err))
320

E
Enwei Jiao 已提交
321
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
322
			metrics.AbandonLabel).Inc()
323 324
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
325
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
326 327 328 329 330
				Reason:    err.Error(),
			},
		}, nil
	}

331 332
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
333
		zap.Uint64("EndTS", hct.EndTs()))
334 335 336

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
337
			zap.Error(err),
338
			zap.Uint64("BeginTS", hct.BeginTs()),
339
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
340

E
Enwei Jiao 已提交
341
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
342
			metrics.FailLabel).Inc()
343 344
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
345
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
346 347 348 349 350
				Reason:    err.Error(),
			},
		}, nil
	}

351 352
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
353
		zap.Uint64("EndTS", hct.EndTs()))
354

E
Enwei Jiao 已提交
355
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
356
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
357
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
358 359 360
	return hct.result, nil
}

361
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
362
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
363 364 365
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
366 367 368

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
369 370
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
371
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
372
		metrics.TotalLabel).Inc()
373
	lct := &loadCollectionTask{
S
sunby 已提交
374
		ctx:                   ctx,
375 376
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
377
		queryCoord:            node.queryCoord,
378
		datacoord:             node.dataCoord,
379 380
	}

381
	log := log.Ctx(ctx).With(
382
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
383 384
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
385

386 387
	log.Debug("LoadCollection received")

388 389
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
390
			zap.Error(err))
391

E
Enwei Jiao 已提交
392
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
393
			metrics.AbandonLabel).Inc()
394
		return &commonpb.Status{
395
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
396 397 398
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
399

400 401
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
402
		zap.Uint64("EndTS", lct.EndTs()))
403 404 405

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
406
			zap.Error(err),
407
			zap.Uint64("BeginTS", lct.BeginTs()),
408
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
409
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
410
			metrics.FailLabel).Inc()
411
		return &commonpb.Status{
412
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
413 414 415 416
			Reason:    err.Error(),
		}, nil
	}

417 418
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
419
		zap.Uint64("EndTS", lct.EndTs()))
420

E
Enwei Jiao 已提交
421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
424
	return lct.result, nil
425 426
}

427
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
428
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
429 430 431
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
432

433
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
434
	defer sp.Finish()
435 436
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
437
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
438
		metrics.TotalLabel).Inc()
439
	rct := &releaseCollectionTask{
S
sunby 已提交
440
		ctx:                      ctx,
441 442
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
443
		queryCoord:               node.queryCoord,
444
		chMgr:                    node.chMgr,
445 446
	}

447
	log := log.Ctx(ctx).With(
448
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
449 450
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
451

452 453
	log.Debug(rpcReceived(method))

454
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
455 456
		log.Warn(
			rpcFailedToEnqueue(method),
457
			zap.Error(err))
458

E
Enwei Jiao 已提交
459
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
460
			metrics.AbandonLabel).Inc()
461
		return &commonpb.Status{
462
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
463 464 465 466
			Reason:    err.Error(),
		}, nil
	}

467 468
	log.Debug(
		rpcEnqueued(method),
469
		zap.Uint64("BeginTS", rct.BeginTs()),
470
		zap.Uint64("EndTS", rct.EndTs()))
471 472

	if err := rct.WaitToFinish(); err != nil {
473 474
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
475
			zap.Error(err),
476
			zap.Uint64("BeginTS", rct.BeginTs()),
477
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
478

E
Enwei Jiao 已提交
479
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
480
			metrics.FailLabel).Inc()
481
		return &commonpb.Status{
482
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
483 484 485 486
			Reason:    err.Error(),
		}, nil
	}

487 488
	log.Debug(
		rpcDone(method),
489
		zap.Uint64("BeginTS", rct.BeginTs()),
490
		zap.Uint64("EndTS", rct.EndTs()))
491

E
Enwei Jiao 已提交
492
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
493
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
494
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
495
	return rct.result, nil
496 497
}

498
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
499
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
500 501 502 503 504
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
505

506
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
507
	defer sp.Finish()
508 509
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
510
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
511
		metrics.TotalLabel).Inc()
512

513
	dct := &describeCollectionTask{
S
sunby 已提交
514
		ctx:                       ctx,
515 516
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
517
		rootCoord:                 node.rootCoord,
518 519
	}

520
	log := log.Ctx(ctx).With(
521
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
522 523
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
524

525 526
	log.Debug("DescribeCollection received")

527 528
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
529
			zap.Error(err))
530

E
Enwei Jiao 已提交
531
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
532
			metrics.AbandonLabel).Inc()
533 534
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
535
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
536 537 538 539 540
				Reason:    err.Error(),
			},
		}, nil
	}

541 542
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
543
		zap.Uint64("EndTS", dct.EndTs()))
544 545 546

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
547
			zap.Error(err),
548
			zap.Uint64("BeginTS", dct.BeginTs()),
549
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
550

E
Enwei Jiao 已提交
551
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
552
			metrics.FailLabel).Inc()
553

554 555
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
556
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
557 558 559 560 561
				Reason:    err.Error(),
			},
		}, nil
	}

562 563
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
564
		zap.Uint64("EndTS", dct.EndTs()))
565

E
Enwei Jiao 已提交
566
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
567
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
568
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
569 570 571
	return dct.result, nil
}

572 573 574 575 576 577 578 579 580
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

581
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetStatistics")
582 583 584
	defer sp.Finish()
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
585
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
586
		metrics.TotalLabel).Inc()
587 588 589 590 591 592 593 594 595 596
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

597
	log := log.Ctx(ctx).With(
598 599
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
600 601 602 603
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
604 605 606 607 608 609 610 611
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
612
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
637
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
638 639 640 641 642 643 644 645 646 647 648 649 650
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
651
		zap.Uint64("EndTS", g.EndTs()))
652

E
Enwei Jiao 已提交
653
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
654
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
655
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
656 657 658
	return g.result, nil
}

659
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
660
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
661 662 663 664 665
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
666 667 668

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
669 670
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
671
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
672
		metrics.TotalLabel).Inc()
673
	g := &getCollectionStatisticsTask{
G
godchen 已提交
674 675 676
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
677
		dataCoord:                      node.dataCoord,
678 679
	}

680
	log := log.Ctx(ctx).With(
681
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
682 683
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
684

685 686
	log.Debug(rpcReceived(method))

687
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
688 689
		log.Warn(
			rpcFailedToEnqueue(method),
690
			zap.Error(err))
691

E
Enwei Jiao 已提交
692
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
693
			metrics.AbandonLabel).Inc()
694

G
godchen 已提交
695
		return &milvuspb.GetCollectionStatisticsResponse{
696
			Status: &commonpb.Status{
697
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
698 699 700 701 702
				Reason:    err.Error(),
			},
		}, nil
	}

703 704
	log.Debug(
		rpcEnqueued(method),
705
		zap.Uint64("BeginTS", g.BeginTs()),
706
		zap.Uint64("EndTS", g.EndTs()))
707 708

	if err := g.WaitToFinish(); err != nil {
709 710
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
711
			zap.Error(err),
712
			zap.Uint64("BeginTS", g.BeginTs()),
713
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
714

E
Enwei Jiao 已提交
715
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
716
			metrics.FailLabel).Inc()
717

G
godchen 已提交
718
		return &milvuspb.GetCollectionStatisticsResponse{
719
			Status: &commonpb.Status{
720
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
721 722 723 724 725
				Reason:    err.Error(),
			},
		}, nil
	}

726 727
	log.Debug(
		rpcDone(method),
728
		zap.Uint64("BeginTS", g.BeginTs()),
729
		zap.Uint64("EndTS", g.EndTs()))
730

E
Enwei Jiao 已提交
731
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
732
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
733
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
734
	return g.result, nil
735 736
}

737
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
738
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
739 740 741 742 743
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
744 745
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowCollections")
	defer sp.Finish()
746 747
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
748
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
749

750
	sct := &showCollectionsTask{
G
godchen 已提交
751 752 753
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
754
		queryCoord:             node.queryCoord,
755
		rootCoord:              node.rootCoord,
756 757
	}

758
	log := log.Ctx(ctx).With(
759
		zap.String("role", typeutil.ProxyRole),
760 761
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
762 763 764 765
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
766

767
	err := node.sched.ddQueue.Enqueue(sct)
768
	if err != nil {
769 770
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
771
			zap.Any("CollectionNames", request.CollectionNames))
772

E
Enwei Jiao 已提交
773
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
774
		return &milvuspb.ShowCollectionsResponse{
775
			Status: &commonpb.Status{
776
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
777 778 779 780 781
				Reason:    err.Error(),
			},
		}, nil
	}

782
	log.Debug("ShowCollections enqueued",
783
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
784

785 786
	err = sct.WaitToFinish()
	if err != nil {
787 788
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
789
			zap.Any("CollectionNames", request.CollectionNames))
790

E
Enwei Jiao 已提交
791
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
792

G
godchen 已提交
793
		return &milvuspb.ShowCollectionsResponse{
794
			Status: &commonpb.Status{
795
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
796 797 798 799 800
				Reason:    err.Error(),
			},
		}, nil
	}

801
	log.Debug("ShowCollections Done",
802 803
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
804

E
Enwei Jiao 已提交
805 806
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
807 808 809
	return sct.result, nil
}

J
jaime 已提交
810 811 812 813 814 815 816 817 818 819
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterCollection")
	defer sp.Finish()
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
820
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
821 822 823 824 825 826 827 828

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

829
	log := log.Ctx(ctx).With(
J
jaime 已提交
830 831 832 833
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

834 835 836
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
837 838 839
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
840
			zap.Error(err))
J
jaime 已提交
841

E
Enwei Jiao 已提交
842
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
843 844 845 846 847 848 849 850 851 852
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
853
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
854 855 856 857 858 859

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
860
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
861

E
Enwei Jiao 已提交
862
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
863 864 865 866 867 868 869 870 871
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
872
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
873

E
Enwei Jiao 已提交
874 875
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
876 877 878
	return act.result, nil
}

879
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
880
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
881 882 883
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
884

885
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
886
	defer sp.Finish()
887 888
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
889
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
890

891
	cpt := &createPartitionTask{
S
sunby 已提交
892
		ctx:                    ctx,
893 894
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
895
		rootCoord:              node.rootCoord,
896 897 898
		result:                 nil,
	}

899
	log := log.Ctx(ctx).With(
900
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
901 902 903
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
904

905 906
	log.Debug(rpcReceived("CreatePartition"))

907 908 909
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
910
			zap.Error(err))
911

E
Enwei Jiao 已提交
912
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
913

914
		return &commonpb.Status{
915
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
916 917 918
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
919

920 921 922
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
923
		zap.Uint64("EndTS", cpt.EndTs()))
924 925 926 927

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
928
			zap.Error(err),
929
			zap.Uint64("BeginTS", cpt.BeginTs()),
930
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
931

E
Enwei Jiao 已提交
932
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
933

934
		return &commonpb.Status{
935
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
936 937 938
			Reason:    err.Error(),
		}, nil
	}
939 940 941 942

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
943
		zap.Uint64("EndTS", cpt.EndTs()))
944

E
Enwei Jiao 已提交
945 946
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
947 948 949
	return cpt.result, nil
}

950
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
951
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
952 953 954
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
955

956
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
957
	defer sp.Finish()
958 959
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
960
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
961

962
	dpt := &dropPartitionTask{
S
sunby 已提交
963
		ctx:                  ctx,
964 965
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
966
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
967
		queryCoord:           node.queryCoord,
968 969 970
		result:               nil,
	}

971
	log := log.Ctx(ctx).With(
972
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
973 974 975
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
976

977 978
	log.Debug(rpcReceived(method))

979 980 981
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
982
			zap.Error(err))
983

E
Enwei Jiao 已提交
984
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
985

986
		return &commonpb.Status{
987
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
988 989 990
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
991

992 993 994
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
995
		zap.Uint64("EndTS", dpt.EndTs()))
996 997 998 999

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1000
			zap.Error(err),
1001
			zap.Uint64("BeginTS", dpt.BeginTs()),
1002
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1003

E
Enwei Jiao 已提交
1004
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1005

1006
		return &commonpb.Status{
1007
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1008 1009 1010
			Reason:    err.Error(),
		}, nil
	}
1011 1012 1013 1014

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1015
		zap.Uint64("EndTS", dpt.EndTs()))
1016

E
Enwei Jiao 已提交
1017 1018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1019 1020 1021
	return dpt.result, nil
}

1022
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1023
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1024 1025 1026 1027 1028
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1029

D
dragondriver 已提交
1030
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1031
	defer sp.Finish()
1032 1033 1034
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1035
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1036
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1037

1038
	hpt := &hasPartitionTask{
S
sunby 已提交
1039
		ctx:                 ctx,
1040 1041
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1042
		rootCoord:           node.rootCoord,
1043 1044 1045
		result:              nil,
	}

1046
	log := log.Ctx(ctx).With(
1047
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1048 1049 1050
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1051

1052 1053
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1054 1055 1056
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1057
			zap.Error(err))
D
dragondriver 已提交
1058

E
Enwei Jiao 已提交
1059
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1060
			metrics.AbandonLabel).Inc()
1061

1062 1063
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1064
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1065 1066 1067 1068 1069
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1070

D
dragondriver 已提交
1071 1072 1073
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1074
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1075 1076 1077 1078

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1079
			zap.Error(err),
D
dragondriver 已提交
1080
			zap.Uint64("BeginTS", hpt.BeginTs()),
1081
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1082

E
Enwei Jiao 已提交
1083
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1084
			metrics.FailLabel).Inc()
1085

1086 1087
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1088
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1089 1090 1091 1092 1093
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1094 1095 1096 1097

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1098
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1099

E
Enwei Jiao 已提交
1100
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1101
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1102
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1103 1104 1105
	return hpt.result, nil
}

1106
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1107
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1108 1109 1110
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1111

D
dragondriver 已提交
1112
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1113
	defer sp.Finish()
1114 1115
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1116
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1117
		metrics.TotalLabel).Inc()
1118
	lpt := &loadPartitionsTask{
G
godchen 已提交
1119 1120 1121
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1122
		queryCoord:            node.queryCoord,
1123
		datacoord:             node.dataCoord,
1124 1125
	}

1126
	log := log.Ctx(ctx).With(
1127
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1128 1129 1130
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1131

1132 1133
	log.Debug(rpcReceived(method))

1134 1135 1136
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1137
			zap.Error(err))
1138

E
Enwei Jiao 已提交
1139
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1140
			metrics.AbandonLabel).Inc()
1141

1142
		return &commonpb.Status{
1143
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1144 1145 1146 1147
			Reason:    err.Error(),
		}, nil
	}

1148 1149 1150
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1151
		zap.Uint64("EndTS", lpt.EndTs()))
1152 1153 1154 1155

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1156
			zap.Error(err),
1157
			zap.Uint64("BeginTS", lpt.BeginTs()),
1158
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1159

E
Enwei Jiao 已提交
1160
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1161
			metrics.FailLabel).Inc()
1162

1163
		return &commonpb.Status{
1164
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1165 1166 1167 1168
			Reason:    err.Error(),
		}, nil
	}

1169 1170 1171
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1172
		zap.Uint64("EndTS", lpt.EndTs()))
1173

E
Enwei Jiao 已提交
1174
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1175
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1176
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1177
	return lpt.result, nil
1178 1179
}

1180
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1181
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1182 1183 1184
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1185 1186 1187 1188

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()

1189
	rpt := &releasePartitionsTask{
G
godchen 已提交
1190 1191 1192
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1193
		queryCoord:               node.queryCoord,
1194 1195
	}

1196
	method := "ReleasePartitions"
1197
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1198
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1199
		metrics.TotalLabel).Inc()
1200 1201

	log := log.Ctx(ctx).With(
1202
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1203 1204 1205
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1206

1207 1208
	log.Debug(rpcReceived(method))

1209 1210 1211
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1212
			zap.Error(err))
1213

E
Enwei Jiao 已提交
1214
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1215
			metrics.AbandonLabel).Inc()
1216

1217
		return &commonpb.Status{
1218
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1219 1220 1221 1222
			Reason:    err.Error(),
		}, nil
	}

1223 1224 1225
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1226
		zap.Uint64("EndTS", rpt.EndTs()))
1227 1228 1229 1230

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1231
			zap.Error(err),
1232
			zap.Uint64("BeginTS", rpt.BeginTs()),
1233
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1234

E
Enwei Jiao 已提交
1235
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1236
			metrics.FailLabel).Inc()
1237

1238
		return &commonpb.Status{
1239
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1240 1241 1242 1243
			Reason:    err.Error(),
		}, nil
	}

1244 1245 1246
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1247
		zap.Uint64("EndTS", rpt.EndTs()))
1248

E
Enwei Jiao 已提交
1249
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1250
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1251
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1252
	return rpt.result, nil
1253 1254
}

1255
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1256
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1257 1258 1259 1260 1261
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1262 1263 1264

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
1265 1266
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1267
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1268
		metrics.TotalLabel).Inc()
1269

1270
	g := &getPartitionStatisticsTask{
1271 1272 1273
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1274
		dataCoord:                     node.dataCoord,
1275 1276
	}

1277
	log := log.Ctx(ctx).With(
1278
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1279 1280 1281
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1282

1283 1284
	log.Debug(rpcReceived(method))

1285 1286 1287
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1288
			zap.Error(err))
1289

E
Enwei Jiao 已提交
1290
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1291
			metrics.AbandonLabel).Inc()
1292

1293 1294 1295 1296 1297 1298 1299 1300
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1301 1302 1303
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1304
		zap.Uint64("EndTS", g.EndTs()))
1305 1306 1307 1308

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1309
			zap.Error(err),
1310
			zap.Uint64("BeginTS", g.BeginTs()),
1311
			zap.Uint64("EndTS", g.EndTs()))
1312

E
Enwei Jiao 已提交
1313
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1314
			metrics.FailLabel).Inc()
1315

1316 1317 1318 1319 1320 1321 1322 1323
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1324 1325 1326
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1327
		zap.Uint64("EndTS", g.EndTs()))
1328

E
Enwei Jiao 已提交
1329
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1330
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1331
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1332
	return g.result, nil
1333 1334
}

1335
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1336
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1337 1338 1339 1340 1341
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1342 1343 1344 1345

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()

1346
	spt := &showPartitionsTask{
G
godchen 已提交
1347 1348 1349
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1350
		rootCoord:             node.rootCoord,
1351
		queryCoord:            node.queryCoord,
G
godchen 已提交
1352
		result:                nil,
1353 1354
	}

1355
	method := "ShowPartitions"
1356 1357
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1358
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1359
		metrics.TotalLabel).Inc()
1360

1361 1362
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1363 1364
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1365
		zap.Any("request", request))
1366 1367 1368 1369 1370 1371 1372

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1373
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1374
			metrics.AbandonLabel).Inc()
1375

G
godchen 已提交
1376
		return &milvuspb.ShowPartitionsResponse{
1377
			Status: &commonpb.Status{
1378
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1379 1380 1381 1382 1383
				Reason:    err.Error(),
			},
		}, nil
	}

1384 1385 1386 1387
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1388 1389
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1390 1391 1392 1393 1394
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1395
			zap.Error(err),
1396 1397 1398 1399 1400
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1401

E
Enwei Jiao 已提交
1402
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1403
			metrics.FailLabel).Inc()
1404

G
godchen 已提交
1405
		return &milvuspb.ShowPartitionsResponse{
1406
			Status: &commonpb.Status{
1407
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1408 1409 1410 1411
				Reason:    err.Error(),
			},
		}, nil
	}
1412 1413 1414 1415 1416 1417 1418 1419 1420

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1424 1425 1426
	return spt.result, nil
}

S
SimFG 已提交
1427 1428 1429 1430 1431 1432
func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
1433
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadingProgress")
S
SimFG 已提交
1434
	defer sp.Finish()
E
Enwei Jiao 已提交
1435
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1436 1437 1438
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1439 1440 1441 1442
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1443
		log.Warn("fail to get loading progress",
1444
			zap.String("collection_name", request.CollectionName),
S
SimFG 已提交
1445 1446
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
E
Enwei Jiao 已提交
1447
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
S
SimFG 已提交
1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
S
SimFG 已提交
1462

1463 1464 1465
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1466
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1467
	)
S
SimFG 已提交
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
S
SimFG 已提交
1478
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
S
SimFG 已提交
1479 1480 1481
			return getErrResponse(err), nil
		}
	} else {
S
SimFG 已提交
1482 1483
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
S
SimFG 已提交
1484 1485 1486 1487
			return getErrResponse(err), nil
		}
	}

1488
	log.Debug(
S
SimFG 已提交
1489 1490
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1491 1492
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1493 1494 1495 1496 1497 1498 1499 1500
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1501
func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) {
S
SimFG 已提交
1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
	if !node.checkHealthy() {
		return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadState"
	tr := timerecord.NewTimeRecorder(method)
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadState")
	defer sp.Finish()
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
	log := log.Ctx(ctx)

	log.Debug(
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadStateResponse {
		log.Warn("fail to get load state",
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
		return &milvuspb.GetLoadStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}

1534 1535
	// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
	// 	is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
S
SimFG 已提交
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

	successResponse := &milvuspb.GetLoadStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	defer func() {
		log.Debug(
			rpcDone(method),
			zap.Any("request", request))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
		metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	}()

	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		successResponse.State = commonpb.LoadState_LoadStateNotExist
		return successResponse, nil
	}

	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	} else {
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	}
	if progress >= 100 {
		successResponse.State = commonpb.LoadState_LoadStateLoaded
	} else {
		successResponse.State = commonpb.LoadState_LoadStateLoading
	}
	return successResponse, nil
1593 1594
}

1595
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1596
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1597 1598 1599
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1600

1601
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateIndex")
D
dragondriver 已提交
1602 1603
	defer sp.Finish()

1604
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1605 1606 1607 1608
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
1609
		datacoord:  node.dataCoord,
1610
		queryCoord: node.queryCoord,
1611 1612
	}

D
dragondriver 已提交
1613
	method := "CreateIndex"
1614
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1615
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1616
		metrics.TotalLabel).Inc()
1617 1618

	log := log.Ctx(ctx).With(
1619
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1620 1621 1622 1623
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1624

1625 1626
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1627 1628 1629
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1630
			zap.Error(err))
D
dragondriver 已提交
1631

E
Enwei Jiao 已提交
1632
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1633
			metrics.AbandonLabel).Inc()
1634

1635
		return &commonpb.Status{
1636
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1637 1638 1639 1640
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1641 1642 1643
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1644
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1645 1646 1647 1648

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1649
			zap.Error(err),
D
dragondriver 已提交
1650
			zap.Uint64("BeginTs", cit.BeginTs()),
1651
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1652

E
Enwei Jiao 已提交
1653
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1654
			metrics.FailLabel).Inc()
1655

1656
		return &commonpb.Status{
1657
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1658 1659 1660 1661
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1662 1663 1664
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1665
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1666

E
Enwei Jiao 已提交
1667
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1668
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1669
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1670 1671 1672
	return cit.result, nil
}

1673
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1674
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1675 1676 1677 1678 1679
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1680 1681 1682 1683

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()

1684
	dit := &describeIndexTask{
S
sunby 已提交
1685
		ctx:                  ctx,
1686 1687
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1688
		datacoord:            node.dataCoord,
1689 1690
	}

1691 1692
	method := "DescribeIndex"
	// avoid data race
1693
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1694
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1695
		metrics.TotalLabel).Inc()
1696 1697

	log := log.Ctx(ctx).With(
1698
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1699 1700 1701
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1702 1703 1704
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1705 1706 1707 1708

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1709
			zap.Error(err))
1710

E
Enwei Jiao 已提交
1711
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1712
			metrics.AbandonLabel).Inc()
1713

1714 1715
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1716
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1717 1718 1719 1720 1721
				Reason:    err.Error(),
			},
		}, nil
	}

1722 1723 1724
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1725
		zap.Uint64("EndTs", dit.EndTs()))
1726 1727 1728 1729

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1730
			zap.Error(err),
1731
			zap.Uint64("BeginTs", dit.BeginTs()),
1732
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1733

Z
zhenshan.cao 已提交
1734 1735 1736 1737
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1738
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1739
			metrics.FailLabel).Inc()
1740

1741 1742
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1743
				ErrorCode: errCode,
1744 1745 1746 1747 1748
				Reason:    err.Error(),
			},
		}, nil
	}

1749 1750 1751
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1752
		zap.Uint64("EndTs", dit.EndTs()))
1753

E
Enwei Jiao 已提交
1754
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1755
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1756
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1757 1758 1759
	return dit.result, nil
}

1760
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1761
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1762 1763 1764
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1765 1766 1767 1768

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()

1769
	dit := &dropIndexTask{
S
sunby 已提交
1770
		ctx:              ctx,
B
BossZou 已提交
1771 1772
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1773
		dataCoord:        node.dataCoord,
1774
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1775
	}
G
godchen 已提交
1776

D
dragondriver 已提交
1777
	method := "DropIndex"
1778
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1779
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1780
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1781

1782
	log := log.Ctx(ctx).With(
1783
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1784 1785 1786 1787 1788
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1789 1790
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1791 1792 1793
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1794
			zap.Error(err))
E
Enwei Jiao 已提交
1795
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1796
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1797

B
BossZou 已提交
1798
		return &commonpb.Status{
1799
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1800 1801 1802
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1803

D
dragondriver 已提交
1804 1805 1806
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1807
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1808 1809 1810 1811

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1812
			zap.Error(err),
D
dragondriver 已提交
1813
			zap.Uint64("BeginTs", dit.BeginTs()),
1814
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1815

E
Enwei Jiao 已提交
1816
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1817
			metrics.FailLabel).Inc()
1818

B
BossZou 已提交
1819
		return &commonpb.Status{
1820
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1821 1822 1823
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1824 1825 1826 1827

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1828
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1829

E
Enwei Jiao 已提交
1830
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1831
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1832
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1833 1834 1835
	return dit.result, nil
}

1836 1837
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1838
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1839
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1840 1841 1842 1843 1844
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1845 1846 1847 1848

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()

1849
	gibpt := &getIndexBuildProgressTask{
1850 1851 1852
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1853
		rootCoord:                    node.rootCoord,
1854
		dataCoord:                    node.dataCoord,
1855 1856
	}

1857
	method := "GetIndexBuildProgress"
1858
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1859
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1860
		metrics.TotalLabel).Inc()
1861 1862

	log := log.Ctx(ctx).With(
1863
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1864 1865 1866 1867
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1868

1869 1870
	log.Debug(rpcReceived(method))

1871 1872 1873
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1874
			zap.Error(err))
E
Enwei Jiao 已提交
1875
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1876
			metrics.AbandonLabel).Inc()
1877

1878 1879 1880 1881 1882 1883 1884 1885
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1886 1887 1888
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1889
		zap.Uint64("EndTs", gibpt.EndTs()))
1890 1891 1892 1893

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1894
			zap.Error(err),
1895
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1896
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1897
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1898
			metrics.FailLabel).Inc()
1899 1900 1901 1902 1903 1904 1905 1906

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1907 1908 1909 1910

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1911
		zap.Uint64("EndTs", gibpt.EndTs()))
1912

E
Enwei Jiao 已提交
1913
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1914
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1915
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1916
	return gibpt.result, nil
1917 1918
}

1919
// GetIndexState get the build-state of index.
1920
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1921
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1922 1923 1924 1925 1926
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1927 1928 1929 1930

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()

1931
	dipt := &getIndexStateTask{
G
godchen 已提交
1932 1933 1934
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1935
		dataCoord:            node.dataCoord,
1936
		rootCoord:            node.rootCoord,
1937 1938
	}

1939
	method := "GetIndexState"
1940
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1941
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1942
		metrics.TotalLabel).Inc()
1943 1944

	log := log.Ctx(ctx).With(
1945
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1946 1947 1948 1949
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1950

1951 1952
	log.Debug(rpcReceived(method))

1953 1954 1955
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1956
			zap.Error(err))
1957

E
Enwei Jiao 已提交
1958
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1959
			metrics.AbandonLabel).Inc()
1960

G
godchen 已提交
1961
		return &milvuspb.GetIndexStateResponse{
1962
			Status: &commonpb.Status{
1963
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1964 1965 1966 1967 1968
				Reason:    err.Error(),
			},
		}, nil
	}

1969 1970 1971
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1972
		zap.Uint64("EndTs", dipt.EndTs()))
1973 1974 1975 1976

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1977
			zap.Error(err),
1978
			zap.Uint64("BeginTs", dipt.BeginTs()),
1979
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
1980
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1981
			metrics.FailLabel).Inc()
1982

G
godchen 已提交
1983
		return &milvuspb.GetIndexStateResponse{
1984
			Status: &commonpb.Status{
1985
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1986 1987 1988 1989 1990
				Reason:    err.Error(),
			},
		}, nil
	}

1991 1992 1993
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1994
		zap.Uint64("EndTs", dipt.EndTs()))
1995

E
Enwei Jiao 已提交
1996
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1997
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1998
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1999 2000 2001
	return dipt.result, nil
}

2002
// Insert insert records into collection.
C
Cai Yudong 已提交
2003
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2004 2005
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
2006 2007 2008
	log := log.Ctx(ctx)
	log.Debug("Start processing insert request in Proxy")
	defer log.Debug("Finish processing insert request in Proxy")
X
Xiangyu Wang 已提交
2009

2010 2011 2012 2013 2014
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2015 2016
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
S
smellthemoon 已提交
2017
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(request)))
E
Enwei Jiao 已提交
2018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
smellthemoon 已提交
2019

2020
	it := &insertTask{
2021 2022
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2023
		// req:       request,
2024
		insertMsg: &msgstream.InsertMsg{
2025 2026 2027
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2028
			InsertRequest: internalpb.InsertRequest{
2029 2030 2031
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2032
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2033
				),
2034 2035
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2036 2037 2038
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2039
				// RowData: transfer column based request to this
2040 2041
			},
		},
2042
		idAllocator:   node.rowIDAllocator,
2043 2044 2045
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2046
	}
2047

2048 2049
	if len(it.insertMsg.PartitionName) <= 0 {
		it.insertMsg.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
2050 2051
	}

X
Xiangyu Wang 已提交
2052
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2053
		numRows := request.NumRows
2054 2055 2056 2057
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2058

X
Xiangyu Wang 已提交
2059 2060 2061 2062 2063 2064 2065
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2066 2067
	}

X
Xiangyu Wang 已提交
2068
	log.Debug("Enqueue insert request in Proxy",
2069
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2070 2071 2072 2073 2074
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2075
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2076

X
Xiangyu Wang 已提交
2077
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2078
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2079
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2080
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2081
		return constructFailedResponse(err), nil
2082
	}
D
dragondriver 已提交
2083

X
Xiangyu Wang 已提交
2084
	log.Debug("Detail of insert request in Proxy",
2085
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2086 2087 2088 2089 2090
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2091
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2092 2093

	if err := it.WaitToFinish(); err != nil {
2094
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2095
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2096
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2097 2098 2099 2100 2101
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2102
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2114
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2115

S
smellthemoon 已提交
2116 2117 2118
	receiveSize := proto.Size(it.insertMsg)
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2119
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2120
		metrics.SuccessLabel).Inc()
2121
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2122 2123 2124
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2125 2126 2127
	return it.result, nil
}

2128
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2129
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2130 2131
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
2132 2133 2134
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2135

S
smellthemoon 已提交
2136
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(request)))
2137

G
groot 已提交
2138 2139 2140 2141 2142 2143
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2144 2145 2146
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2147
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2148
		metrics.TotalLabel).Inc()
2149
	dt := &deleteTask{
X
xige-16 已提交
2150 2151 2152
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
2153
		deleteMsg: &BaseDeleteTask{
G
godchen 已提交
2154 2155 2156
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2157
			DeleteRequest: internalpb.DeleteRequest{
2158 2159 2160 2161
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2162
				DbName:         request.DbName,
G
godchen 已提交
2163 2164 2165
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2166 2167 2168 2169
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2170 2171
	}

2172
	log.Debug("Enqueue delete request in Proxy",
2173
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2174 2175 2176 2177
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2178 2179 2180

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2181
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2182
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2183
			metrics.AbandonLabel).Inc()
2184

G
groot 已提交
2185 2186 2187 2188 2189 2190 2191 2192
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2193
	log.Debug("Detail of delete request in Proxy",
2194
		zap.String("role", typeutil.ProxyRole),
2195
		zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp),
G
groot 已提交
2196 2197 2198
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2199
		zap.String("expr", request.Expr))
G
groot 已提交
2200

2201
	if err := dt.WaitToFinish(); err != nil {
2202
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2203
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2204
			metrics.FailLabel).Inc()
G
groot 已提交
2205 2206 2207 2208 2209 2210 2211 2212
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

S
smellthemoon 已提交
2213 2214 2215
	receiveSize := proto.Size(dt.deleteMsg)
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2216
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2217
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2218 2219
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2220 2221 2222
	return dt.result, nil
}

S
smellthemoon 已提交
2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Upsert")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Uint32("NumRows", request.NumRows),
	)
	log.Debug("Start processing upsert request in Proxy")

	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
	method := "Upsert"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Add(float64(proto.Size(request)))
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()

	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),

		req: &milvuspb.UpsertRequest{
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType(commonpb.MsgType_Upsert)),
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
			),
			CollectionName: request.CollectionName,
			PartitionName:  request.PartitionName,
			FieldsData:     request.FieldsData,
			NumRows:        request.NumRows,
		},

		result: &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	if len(it.req.PartitionName) <= 0 {
		it.req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
	}

	constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
		numRows := request.NumRows
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}

		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: errCode,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
	}

	log.Debug("Enqueue upsert request in Proxy",
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)))

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Info("Failed to enqueue upsert task",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("Detail of upsert request in Proxy",
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()))

	if err := it.WaitToFinish(); err != nil {
		log.Info("Failed to execute insert task in task scheduler",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
		return constructFailedResponse(err, it.result.Status.ErrorCode), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
			numRows := request.NumRows
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}
		setErrorIndex()
	}

	insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg)
	deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg)

	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(deleteReceiveSize))
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(insertReceiveSize))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Debug("Finish processing upsert request in Proxy")
	return it.result, nil
}

2357
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2358
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2359
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2360
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2361 2362 2363

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2364 2365 2366 2367 2368
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2369 2370
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2371
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2372
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2373

C
cai.zhang 已提交
2374 2375
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2376

2377
	qt := &searchTask{
S
sunby 已提交
2378
		ctx:       ctx,
2379
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2380
		SearchRequest: &internalpb.SearchRequest{
2381 2382
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2383
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2384
			),
E
Enwei Jiao 已提交
2385
			ReqID: paramtable.GetNodeID(),
2386
		},
2387 2388 2389 2390
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2391 2392
	}

2393 2394 2395
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2396
	log := log.Ctx(ctx).With(
2397
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2398 2399 2400 2401 2402
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2403 2404 2405 2406
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2407

2408 2409 2410
	log.Debug(
		rpcReceived(method))

2411
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2412
		log.Warn(
2413
			rpcFailedToEnqueue(method),
2414
			zap.Error(err))
D
dragondriver 已提交
2415

E
Enwei Jiao 已提交
2416
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2417
			metrics.AbandonLabel).Inc()
2418

2419 2420
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2421
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2422 2423 2424 2425
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2426
	tr.CtxRecord(ctx, "search request enqueue")
2427

2428
	log.Debug(
2429
		rpcEnqueued(method),
2430
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2431

2432
	if err := qt.WaitToFinish(); err != nil {
2433
		log.Warn(
2434
			rpcFailedToWaitToFinish(method),
2435
			zap.Error(err))
2436

E
Enwei Jiao 已提交
2437
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2438
			metrics.FailLabel).Inc()
2439

2440 2441
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2442
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2443 2444 2445 2446 2447
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2448
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2449
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2450
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2451
	tr.CtxRecord(ctx, "wait search result")
2452
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2453

E
Enwei Jiao 已提交
2454
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2455
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2456
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2457
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2458
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2459
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2460
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2461
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2462 2463
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2464
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2465
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2466
	}
2467 2468 2469
	return qt.result, nil
}

2470
// Flush notify data nodes to persist the data of collection.
2471 2472 2473 2474 2475 2476 2477
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2478
	if !node.checkHealthy() {
2479 2480
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2481
	}
D
dragondriver 已提交
2482 2483 2484 2485

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()

2486
	ft := &flushTask{
T
ThreadDao 已提交
2487 2488 2489
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2490
		dataCoord:    node.dataCoord,
2491 2492
	}

D
dragondriver 已提交
2493
	method := "Flush"
2494
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2495
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2496

2497
	log := log.Ctx(ctx).With(
2498
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2499 2500
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2501

2502 2503
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2504 2505 2506
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2507
			zap.Error(err))
D
dragondriver 已提交
2508

E
Enwei Jiao 已提交
2509
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2510

2511 2512
		resp.Status.Reason = err.Error()
		return resp, nil
2513 2514
	}

D
dragondriver 已提交
2515 2516 2517
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2518
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2519 2520 2521 2522

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2523
			zap.Error(err),
D
dragondriver 已提交
2524
			zap.Uint64("BeginTs", ft.BeginTs()),
2525
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2526

E
Enwei Jiao 已提交
2527
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2528

D
dragondriver 已提交
2529
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2530 2531
		resp.Status.Reason = err.Error()
		return resp, nil
2532 2533
	}

D
dragondriver 已提交
2534 2535 2536
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2537
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2538

E
Enwei Jiao 已提交
2539 2540
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2541
	return ft.result, nil
2542 2543
}

2544
// Query get the records by primary keys.
C
Cai Yudong 已提交
2545
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2546
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2547
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2548 2549 2550

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2551 2552 2553 2554 2555
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2556

D
dragondriver 已提交
2557 2558
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
2559
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2560

2561
	qt := &queryTask{
2562 2563 2564
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2565 2566
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2567
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2568
			),
E
Enwei Jiao 已提交
2569
			ReqID: paramtable.GetNodeID(),
2570
		},
2571 2572
		request:          request,
		qc:               node.queryCoord,
2573
		queryShardPolicy: mergeRoundRobinPolicy,
2574
		shardMgr:         node.shardMgr,
2575 2576
	}

D
dragondriver 已提交
2577 2578
	method := "Query"

E
Enwei Jiao 已提交
2579
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2580 2581
		metrics.TotalLabel).Inc()

2582
	log := log.Ctx(ctx).With(
2583
		zap.String("role", typeutil.ProxyRole),
2584 2585
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2586 2587 2588 2589
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2590 2591 2592 2593
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2594

D
dragondriver 已提交
2595
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2596
		log.Warn(
D
dragondriver 已提交
2597
			rpcFailedToEnqueue(method),
2598
			zap.Error(err))
D
dragondriver 已提交
2599

E
Enwei Jiao 已提交
2600
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2601
			metrics.AbandonLabel).Inc()
2602

2603 2604 2605 2606 2607 2608
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2609
	}
Z
Zach 已提交
2610
	tr.CtxRecord(ctx, "query request enqueue")
2611

2612
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2613 2614

	if err := qt.WaitToFinish(); err != nil {
2615
		log.Warn(
D
dragondriver 已提交
2616
			rpcFailedToWaitToFinish(method),
2617
			zap.Error(err))
2618

E
Enwei Jiao 已提交
2619
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2620
			metrics.FailLabel).Inc()
2621

2622 2623 2624 2625 2626 2627 2628
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2629
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2630
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2631
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2632

2633
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2634

E
Enwei Jiao 已提交
2635
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2636 2637
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2638
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2639
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2640
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2641
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2642 2643

	ret := &milvuspb.QueryResults{
2644 2645
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2646 2647
	}
	sentSize := proto.Size(qt.result)
2648
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2649
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2650
	return ret, nil
2651
}
2652

2653
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2654 2655 2656 2657
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2658 2659 2660 2661

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()

Y
Yusup 已提交
2662 2663 2664 2665 2666 2667 2668
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2669
	method := "CreateAlias"
2670
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2671
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2672

2673
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2674 2675 2676 2677 2678
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2679 2680
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2681 2682 2683
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2684
			zap.Error(err))
D
dragondriver 已提交
2685

E
Enwei Jiao 已提交
2686
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2687

Y
Yusup 已提交
2688 2689 2690 2691 2692 2693
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2694 2695 2696
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2697
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2698 2699 2700 2701

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2702
			zap.Error(err),
D
dragondriver 已提交
2703
			zap.Uint64("BeginTs", cat.BeginTs()),
2704
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2705
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2706 2707 2708 2709 2710 2711 2712

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2713 2714 2715
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2716
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2717

E
Enwei Jiao 已提交
2718 2719
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2720 2721 2722
	return cat.result, nil
}

2723
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2724 2725 2726 2727
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2728 2729 2730 2731

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()

Y
Yusup 已提交
2732 2733 2734 2735 2736 2737 2738
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2739
	method := "DropAlias"
2740
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2741
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2742

2743
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2744 2745 2746 2747
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2748 2749
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2750 2751 2752
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2753
			zap.Error(err))
E
Enwei Jiao 已提交
2754
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2755

Y
Yusup 已提交
2756 2757 2758 2759 2760 2761
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2762 2763 2764
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2765
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2766 2767 2768 2769

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2770
			zap.Error(err),
D
dragondriver 已提交
2771
			zap.Uint64("BeginTs", dat.BeginTs()),
2772
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2773

E
Enwei Jiao 已提交
2774
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2775

Y
Yusup 已提交
2776 2777 2778 2779 2780 2781
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2782 2783 2784
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2785
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2786

E
Enwei Jiao 已提交
2787 2788
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2789 2790 2791
	return dat.result, nil
}

2792
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2793 2794 2795 2796
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2797 2798 2799 2800

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()

Y
Yusup 已提交
2801 2802 2803 2804 2805 2806 2807
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2808
	method := "AlterAlias"
2809
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2810
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2811

2812
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2813 2814 2815 2816 2817
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2818 2819
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2820 2821 2822
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2823
			zap.Error(err))
E
Enwei Jiao 已提交
2824
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2825

Y
Yusup 已提交
2826 2827 2828 2829 2830 2831
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2832 2833 2834
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2835
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2836 2837 2838 2839

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2840
			zap.Error(err),
D
dragondriver 已提交
2841
			zap.Uint64("BeginTs", aat.BeginTs()),
2842
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2843

E
Enwei Jiao 已提交
2844
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2845

Y
Yusup 已提交
2846 2847 2848 2849 2850 2851
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2852 2853 2854
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2855
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2856

E
Enwei Jiao 已提交
2857 2858
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2859 2860 2861
	return aat.result, nil
}

2862
// CalcDistance calculates the distances between vectors.
2863
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2864 2865 2866 2867 2868
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2869

2870 2871 2872 2873
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2874 2875
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2876

2877 2878 2879 2880 2881
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2882 2883
		}

2884
		qt := &queryTask{
2885 2886 2887
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2888 2889
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2890
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2891
				),
E
Enwei Jiao 已提交
2892
				ReqID: paramtable.GetNodeID(),
2893
			},
2894 2895 2896 2897
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2898
			queryShardPolicy: mergeRoundRobinPolicy,
2899
			shardMgr:         node.shardMgr,
2900 2901
		}

2902
		log := log.Ctx(ctx).With(
G
groot 已提交
2903 2904
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2905
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2906

2907
		err := node.sched.dqQueue.Enqueue(qt)
2908
		if err != nil {
2909 2910
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2911

2912 2913 2914 2915 2916
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2917
			}, err
2918
		}
2919

2920
		log.Debug("CalcDistance queryTask enqueued")
2921 2922 2923

		err = qt.WaitToFinish()
		if err != nil {
2924 2925
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2926 2927 2928 2929 2930 2931

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2932
			}, err
2933
		}
2934

2935
		log.Debug("CalcDistance queryTask Done")
2936 2937

		return &milvuspb.QueryResults{
2938 2939
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2940 2941 2942
		}, nil
	}

G
groot 已提交
2943 2944 2945 2946
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
2947 2948
	}

G
groot 已提交
2949
	return task.Execute(ctx, request)
2950 2951
}

2952
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2953
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2954 2955
	panic("implement me")
}
X
XuanYang-cn 已提交
2956

2957
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2958
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
2959 2960 2961 2962 2963
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
2964
	log.Debug("GetPersistentSegmentInfo",
2965
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2966 2967 2968
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2969
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2970
		Status: &commonpb.Status{
2971
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2972 2973
		},
	}
2974 2975 2976 2977
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
2978 2979
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2980
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2981
		metrics.TotalLabel).Inc()
2982 2983 2984

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
2985
	if err != nil {
E
Enwei Jiao 已提交
2986
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
2998
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2999
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3000 3001
		return resp, nil
	}
3002 3003

	// get Segment info
3004
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3005 3006 3007
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3008
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3009
		),
3010
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3011 3012
	})
	if err != nil {
E
Enwei Jiao 已提交
3013
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3014
			metrics.FailLabel).Inc()
3015 3016
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
3017
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3018 3019
		return resp, nil
	}
3020 3021 3022
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
3023
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3024
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3025
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3026 3027 3028 3029 3030 3031
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3032
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3033 3034
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3035
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3036 3037 3038
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3039
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3040
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3041
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3042
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3043 3044 3045 3046
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3047
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3048
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
3049 3050 3051 3052 3053
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
3054
	log.Debug("GetQuerySegmentInfo",
3055
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3056 3057 3058
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3059
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3060
		Status: &commonpb.Status{
3061
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3062 3063
		},
	}
3064 3065 3066 3067
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3068

3069 3070
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3071
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3072 3073
		metrics.TotalLabel).Inc()

3074 3075
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3076
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3077 3078 3079
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3080
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3081 3082 3083
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3084
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3085
		),
3086
		CollectionID: collID,
Z
zhenshan.cao 已提交
3087 3088
	})
	if err != nil {
E
Enwei Jiao 已提交
3089
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3090 3091
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
3092 3093 3094
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3095 3096 3097
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
3098
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3099
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3100 3101
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3115
			State:        info.SegmentState,
3116
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3117 3118
		}
	}
3119

E
Enwei Jiao 已提交
3120 3121
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3122
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3123 3124 3125 3126
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3127
// Dummy handles dummy request
C
Cai Yudong 已提交
3128
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3129 3130 3131 3132 3133 3134
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
3135 3136 3137 3138 3139 3140

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Dummy")
	defer sp.Finish()

	log := log.Ctx(ctx)

3141
	if err != nil {
3142 3143
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
3144 3145 3146
		return failedResponse, nil
	}

3147 3148
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3149
		if err != nil {
3150 3151
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
3152 3153 3154
			return failedResponse, nil
		}

3155
		request := &milvuspb.QueryRequest{
3156 3157 3158
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3159
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3160 3161
		}

3162
		_, err = node.Query(ctx, request)
3163
		if err != nil {
3164 3165
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
3166 3167
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3168 3169 3170 3171 3172 3173

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3174 3175
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3176 3177
}

J
jingkl 已提交
3178
// RegisterLink registers a link
C
Cai Yudong 已提交
3179
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3180
	code := node.stateCode.Load().(commonpb.StateCode)
3181 3182 3183 3184 3185

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RegisterLink")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3186
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3187
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3188

3189 3190
	log.Debug("RegisterLink")

3191
	if code != commonpb.StateCode_Healthy {
3192 3193 3194
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3195
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3196
				Reason:    "proxy not healthy",
3197 3198 3199
			},
		}, nil
	}
E
Enwei Jiao 已提交
3200
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3201 3202 3203
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3204
			ErrorCode: commonpb.ErrorCode_Success,
3205
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3206 3207 3208
		},
	}, nil
}
3209

3210
// GetMetrics gets the metrics of proxy
3211 3212
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3213 3214 3215 3216 3217
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx)

3218 3219
	log.RatedDebug(60, "Proxy.GetMetrics",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3220 3221 3222 3223
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3224
			zap.Int64("nodeID", paramtable.GetNodeID()),
3225
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3226
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3227 3228 3229 3230

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3231
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3232 3233 3234 3235 3236 3237 3238 3239
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3240
			zap.Int64("nodeID", paramtable.GetNodeID()),
3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

3253 3254 3255
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3256
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3257
	)
3258
	if metricType == metricsinfo.SystemInfoMetrics {
3259 3260 3261
		metrics, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err != nil {
			metrics, err = getSystemInfoMetrics(ctx, req, node)
3262
		}
3263

3264 3265
		log.RatedDebug(60, "Proxy.GetMetrics",
			zap.Int64("nodeID", paramtable.GetNodeID()),
3266
			zap.String("req", req.Request),
3267
			zap.String("metricType", metricType),
3268 3269 3270
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3271 3272
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3273
		return metrics, nil
3274 3275
	}

3276 3277
	log.RatedWarn(60, "Proxy.GetMetrics failed, request metric type is not implemented yet",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3278
		zap.String("req", req.Request),
3279
		zap.String("metricType", metricType))
3280 3281 3282 3283 3284 3285 3286 3287 3288 3289

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3290 3291 3292
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3293 3294 3295 3296
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetProxyMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3297
		zap.Int64("nodeID", paramtable.GetNodeID()),
3298 3299
		zap.String("req", req.Request))

3300 3301
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3302
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3303 3304 3305 3306

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3307
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3325 3326 3327
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3328
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3329
	)
3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		log.Debug("Proxy.GetProxyMetrics",
3346
			zap.String("metricType", metricType))
3347 3348 3349 3350

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3351
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3352
		zap.String("metricType", metricType))
3353 3354 3355 3356 3357 3358 3359 3360 3361

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3362 3363
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
3364 3365 3366 3367 3368
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadBalance")
	defer sp.Finish()

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3369
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3370
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3371 3372 3373 3374 3375 3376 3377 3378 3379
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3380 3381 3382

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3383
		log.Warn("failed to get collection id",
3384 3385
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3386 3387 3388
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3389
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3390 3391 3392
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3393
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3394
		),
B
bigsheeper 已提交
3395 3396
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3397
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3398
		SealedSegmentIDs: req.SealedSegmentIDs,
3399
		CollectionID:     collectionID,
B
bigsheeper 已提交
3400 3401
	})
	if err != nil {
J
Jiquan Long 已提交
3402
		log.Warn("Failed to LoadBalance from Query Coordinator",
3403 3404
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3405 3406 3407 3408
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3409
		log.Warn("Failed to LoadBalance from Query Coordinator",
3410
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3411 3412 3413
		status.Reason = infoResp.Reason
		return status, nil
	}
3414 3415 3416
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3417 3418 3419 3420
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3421 3422
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
3423 3424 3425 3426 3427 3428 3429 3430
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetReplicas")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3431 3432 3433 3434 3435 3436
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3437 3438
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3439
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3440
	)
3441 3442 3443

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3444 3445
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3446 3447 3448 3449
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3450 3451 3452
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3453 3454 3455
	return resp, nil
}

3456
// GetCompactionState gets the compaction state of multiple segments
3457
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
3458 3459 3460 3461 3462 3463 3464
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionState")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3465 3466 3467 3468 3469 3470 3471
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3472 3473 3474
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3475 3476 3477
	return resp, err
}

3478
// ManualCompaction invokes compaction on specified collection
3479
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
3480 3481 3482 3483 3484 3485 3486
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ManualCompaction")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3487 3488 3489 3490 3491 3492 3493
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3494 3495 3496
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3497 3498 3499
	return resp, err
}

3500
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3501
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
3502 3503 3504 3505 3506 3507 3508
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3509 3510 3511 3512 3513 3514 3515
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3516 3517 3518
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3519 3520 3521
	return resp, err
}

B
Bingyi Sun 已提交
3522 3523
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
3524 3525 3526 3527 3528 3529 3530
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetFlushState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3531
	var err error
B
Bingyi Sun 已提交
3532 3533 3534
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3535
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3536 3537 3538
		return resp, nil
	}

3539
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3540
	if err != nil {
3541 3542
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3543 3544
		return nil, err
	}
3545 3546
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3547 3548 3549
	return resp, err
}

C
Cai Yudong 已提交
3550 3551
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3552 3553
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3554 3555
}

3556 3557 3558
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3559 3560
}

3561
// unhealthyStatus returns the proxy not healthy status
3562 3563 3564
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3565
		Reason:    "proxy not healthy",
3566 3567
	}
}
G
groot 已提交
3568 3569 3570

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3571 3572 3573 3574 3575
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Import")
	defer sp.Finish()

	log := log.Ctx(ctx)

3576 3577
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3578 3579
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3580 3581 3582 3583 3584 3585
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3586 3587 3588 3589
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3590

3591 3592
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3593 3594
		log.Error("failed to execute import request",
			zap.Error(err))
3595 3596 3597 3598 3599
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3600 3601
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3602
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3603 3604
		metrics.TotalLabel).Inc()

3605
	// Call rootCoord to finish import.
3606 3607
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3608
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3609 3610
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3611 3612 3613 3614
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3615

E
Enwei Jiao 已提交
3616 3617
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3618
	return respFromRC, nil
G
groot 已提交
3619 3620
}

3621
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3622
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
3623 3624 3625 3626 3627 3628 3629
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetImportState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3630 3631 3632 3633 3634
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3635 3636
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3637
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3638
		metrics.TotalLabel).Inc()
G
groot 已提交
3639 3640

	resp, err := node.rootCoord.GetImportState(ctx, req)
3641
	if err != nil {
E
Enwei Jiao 已提交
3642
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3643 3644
		log.Error("failed to execute get import state",
			zap.Error(err))
3645 3646 3647 3648 3649
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3650 3651 3652
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3653 3654
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3655
	return resp, nil
G
groot 已提交
3656 3657 3658 3659
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
3660 3661 3662 3663 3664
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListImportTasks")
	defer sp.Finish()

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3665
	log.Debug("received list import tasks request")
G
groot 已提交
3666 3667 3668 3669 3670
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3671 3672
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3673
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3674
		metrics.TotalLabel).Inc()
G
groot 已提交
3675
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3676
	if err != nil {
E
Enwei Jiao 已提交
3677
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3678 3679
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3680 3681
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3682 3683 3684
		return resp, nil
	}

3685 3686 3687
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3688 3689
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3690 3691 3692
	return resp, err
}

3693 3694 3695
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3696 3697 3698 3699 3700

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3701 3702
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3703 3704

	log.Debug("received request to invalidate credential cache")
3705
	if !node.checkHealthy() {
3706
		return unhealthyStatus(), nil
3707
	}
3708 3709 3710 3711 3712

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3713
	log.Debug("complete to invalidate credential cache")
3714 3715 3716 3717 3718 3719 3720 3721 3722 3723

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3724 3725 3726 3727 3728

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3729 3730
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3731 3732

	log.Debug("received request to update credential cache")
3733
	if !node.checkHealthy() {
3734
		return unhealthyStatus(), nil
3735
	}
3736 3737

	credInfo := &internalpb.CredentialInfo{
3738 3739
		Username:       request.Username,
		Sha256Password: request.Password,
3740 3741 3742 3743
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3744
	log.Debug("complete to update credential cache")
3745 3746 3747 3748 3749 3750 3751 3752

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
3753 3754 3755 3756 3757 3758 3759 3760
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3761
	if !node.checkHealthy() {
3762
		return unhealthyStatus(), nil
3763
	}
3764 3765 3766 3767 3768 3769 3770 3771 3772 3773
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3774 3775
		log.Error("decode password fail",
			zap.Error(err))
3776 3777 3778 3779 3780 3781
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3782 3783
		log.Error("illegal password",
			zap.Error(err))
3784 3785 3786 3787 3788 3789 3790
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3791 3792
		log.Error("encrypt password fail",
			zap.Error(err))
3793 3794 3795 3796 3797
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3798

3799 3800 3801
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3802
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3803 3804 3805
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3806 3807
		log.Error("create credential fail",
			zap.Error(err))
3808 3809 3810 3811 3812 3813 3814 3815
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3816
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
3817 3818 3819 3820 3821 3822 3823 3824
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3825
	if !node.checkHealthy() {
3826
		return unhealthyStatus(), nil
3827
	}
C
codeman 已提交
3828 3829
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3830 3831
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3832 3833 3834 3835 3836 3837
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3838
	if err != nil {
3839 3840
		log.Error("decode password fail",
			zap.Error(err))
3841 3842 3843 3844 3845
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3846 3847
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3848 3849
		log.Error("illegal password",
			zap.Error(err))
3850 3851 3852 3853 3854
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3855 3856

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3857 3858 3859 3860 3861 3862 3863
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3864
	if err != nil {
3865 3866
		log.Error("encrypt password fail",
			zap.Error(err))
3867 3868 3869 3870 3871
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3872
	updateCredReq := &internalpb.CredentialInfo{
3873
		Username:          req.Username,
3874
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3875 3876
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3877
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3878
	if err != nil { // for error like conntext timeout etc.
3879 3880
		log.Error("update credential fail",
			zap.Error(err))
3881 3882 3883 3884 3885 3886 3887 3888 3889
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
3890 3891 3892 3893 3894 3895 3896 3897
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DeleteCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3898
	if !node.checkHealthy() {
3899
		return unhealthyStatus(), nil
3900 3901
	}

3902 3903 3904 3905 3906 3907
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3908 3909
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3910 3911
		log.Error("delete credential fail",
			zap.Error(err))
3912 3913 3914 3915 3916 3917 3918 3919 3920
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
3921 3922 3923 3924 3925 3926 3927
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListCredUsers")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3928
	if !node.checkHealthy() {
3929
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3930
	}
3931
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3932 3933 3934
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3935 3936
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3949
		Usernames: resp.Usernames,
3950 3951
	}, nil
}
3952

3953
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
3954 3955 3956 3957 3958 3959 3960
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3961
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3962
		return errorutil.UnhealthyStatus(code), nil
3963 3964 3965 3966 3967 3968 3969 3970 3971 3972
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3973
		}, nil
3974 3975 3976 3977
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
3978 3979
		log.Error("fail to create role",
			zap.Error(err))
3980 3981 3982
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3983
		}, nil
3984 3985
	}
	return result, nil
3986 3987
}

3988
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
3989 3990 3991 3992 3993 3994 3995
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
3996
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3997
		return errorutil.UnhealthyStatus(code), nil
3998 3999 4000 4001 4002
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4003
		}, nil
4004
	}
4005 4006 4007 4008 4009
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4010
		}, nil
4011
	}
4012 4013
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
4014 4015 4016
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
4017 4018 4019
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4020
		}, nil
4021 4022
	}
	return result, nil
4023 4024
}

4025
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
4026 4027 4028 4029 4030 4031 4032
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperateUserRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
4033
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4034
		return errorutil.UnhealthyStatus(code), nil
4035 4036 4037 4038 4039
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4040
		}, nil
4041 4042 4043 4044 4045
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4046
		}, nil
4047 4048 4049 4050
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
4051 4052
		logger.Error("fail to operate user role",
			zap.Error(err))
4053 4054 4055
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4056
		}, nil
4057 4058
	}
	return result, nil
4059 4060
}

4061
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
4062 4063 4064 4065 4066 4067
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
4068
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4069
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4070 4071 4072 4073 4074 4075 4076 4077 4078
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4079
			}, nil
4080 4081 4082 4083 4084
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
4085 4086
		log.Error("fail to select role",
			zap.Error(err))
4087 4088 4089 4090 4091
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4092
		}, nil
4093 4094
	}
	return result, nil
4095 4096
}

4097
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
4098 4099 4100 4101 4102 4103 4104
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectUser")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
4105
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4106
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4107 4108 4109 4110 4111 4112 4113 4114 4115
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4116
			}, nil
4117 4118 4119 4120 4121
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
4122 4123
		log.Error("fail to select user",
			zap.Error(err))
4124 4125 4126 4127 4128
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4129
		}, nil
4130 4131
	}
	return result, nil
4132 4133
}

4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4164 4165
}

4166
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
4167 4168 4169 4170 4171 4172 4173
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperatePrivilege")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4174
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4175
		return errorutil.UnhealthyStatus(code), nil
4176 4177 4178 4179 4180
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4181
		}, nil
4182 4183 4184 4185 4186 4187
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4188
		}, nil
4189 4190 4191 4192
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4193 4194
		log.Error("fail to operate privilege",
			zap.Error(err))
4195 4196 4197
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4198
		}, nil
4199 4200
	}
	return result, nil
4201 4202
}

4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
4230 4231 4232 4233 4234 4235 4236
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectGrant")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4237
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4238
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4239 4240 4241 4242 4243 4244 4245 4246
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4247
		}, nil
4248 4249 4250 4251
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4252 4253
		log.Error("fail to select grant",
			zap.Error(err))
4254 4255 4256 4257 4258
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4259
		}, nil
4260 4261 4262 4263 4264
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
4265 4266 4267 4268 4269 4270 4271
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4272 4273 4274 4275 4276 4277 4278 4279 4280 4281
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4282 4283
			log.Error("fail to refresh policy info",
				zap.Error(err))
4284 4285 4286 4287 4288 4289
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4290
	log.Debug("RefreshPrivilegeInfoCache success")
4291 4292 4293 4294

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4295
}
4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
4313 4314 4315 4316 4317 4318 4319
	node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetStateReasons())
	log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates()))
	if len(request.GetStates()) != 0 {
		for i := range request.GetStates() {
			log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetStateReasons()[i]))
		}
	}
4320 4321 4322
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4323 4324 4325 4326

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4327 4328 4329 4330
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4331 4332 4333 4334 4335 4336 4337 4338 4339 4340
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

4341 4342 4343 4344 4345
		sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.Finish()

		log := log.Ctx(ctx).With(zap.String("role", role))

4346
		if err != nil {
4347 4348
			log.Warn("check health fail",
				zap.Error(err))
4349 4350 4351 4352 4353
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4354
			log.Warn("check health fail")
4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
4378 4379 4380
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
4381 4382 4383 4384 4385
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4386
	states, reasons := node.multiRateLimiter.GetQuotaStates()
4387 4388 4389 4390 4391
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
4392 4393 4394
		QuotaStates: states,
		Reasons:     reasons,
		IsHealthy:   true,
4395
	}, nil
4396
}