impl.go 134.6 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24
	"strconv"
25 26 27
	"sync"

	"golang.org/x/sync/errgroup"
28

29
	"github.com/golang/protobuf/proto"
S
SimFG 已提交
30 31
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
32
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/log"
34
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
35
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
36 37 38 39
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
40
	"github.com/milvus-io/milvus/internal/util"
41
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43
	"github.com/milvus-io/milvus/internal/util/errorutil"
44
	"github.com/milvus-io/milvus/internal/util/importutil"
45 46
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
47
	"github.com/milvus-io/milvus/internal/util/paramtable"
48
	"github.com/milvus-io/milvus/internal/util/timerecord"
49
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
50
	"github.com/milvus-io/milvus/internal/util/typeutil"
51
	"go.uber.org/zap"
52 53
)

54 55
const moduleName = "Proxy"

56
// UpdateStateCode updates the state code of Proxy.
57
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
58
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
59 60
}

61
// GetComponentStates get state of Proxy.
62 63
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
64 65 66 67
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
68
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
69 70 71 72 73 74
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
75
		return stats, nil
G
godchen 已提交
76
	}
77 78 79 80
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
81
	info := &milvuspb.ComponentInfo{
82 83
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
84
		Role:      typeutil.ProxyRole,
G
godchen 已提交
85 86 87 88 89 90
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
91
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
92
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
93 94 95 96 97 98 99 100 101
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

102
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
103
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
104 105 106
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
107
	ctx = logutil.WithModule(ctx, moduleName)
108 109 110
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.Finish()
	log := log.Ctx(ctx).With(
111
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
112
		zap.String("db", request.DbName),
113 114
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
115

116 117
	log.Info("received request to invalidate collection meta cache")

118
	collectionName := request.CollectionName
119
	collectionID := request.CollectionID
X
Xiaofan 已提交
120 121

	var aliasName []string
N
neza2017 已提交
122
	if globalMetaCache != nil {
123 124 125 126
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
127
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
128
		}
N
neza2017 已提交
129
	}
130 131
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
132 133
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
134
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
135
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
136
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
137
		}
138
	}
139
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
140

141
	return &commonpb.Status{
142
		ErrorCode: commonpb.ErrorCode_Success,
143 144
		Reason:    "",
	}, nil
145 146
}

147
// CreateCollection create a collection by the schema.
148
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
149
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
150 151 152
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
153 154 155

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
156 157 158
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
159
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
160

161
	cct := &createCollectionTask{
S
sunby 已提交
162
		ctx:                     ctx,
163 164
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
165
		rootCoord:               node.rootCoord,
166 167
	}

168 169 170
	// avoid data race
	lenOfSchema := len(request.Schema)

171
	log := log.Ctx(ctx).With(
172
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
173 174
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
175
		zap.Int("len(schema)", lenOfSchema),
176 177
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
178

179 180
	log.Debug(rpcReceived(method))

181 182 183
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
184
			zap.Error(err))
185

E
Enwei Jiao 已提交
186
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
187
		return &commonpb.Status{
188
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
189 190 191 192
			Reason:    err.Error(),
		}, nil
	}

193 194
	log.Debug(
		rpcEnqueued(method),
195 196
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
197
		zap.Uint64("timestamp", request.Base.Timestamp))
198

199 200 201
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
202
			zap.Error(err),
203
			zap.Uint64("BeginTs", cct.BeginTs()),
204
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
205

E
Enwei Jiao 已提交
206
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
207
		return &commonpb.Status{
208
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
209 210 211 212
			Reason:    err.Error(),
		}, nil
	}

213 214
	log.Debug(
		rpcDone(method),
215
		zap.Uint64("BeginTs", cct.BeginTs()),
216
		zap.Uint64("EndTs", cct.EndTs()))
217

E
Enwei Jiao 已提交
218 219
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
220 221 222
	return cct.result, nil
}

223
// DropCollection drop a collection.
C
Cai Yudong 已提交
224
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
225 226 227
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
228 229 230

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
231 232
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
233
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
234

235
	dct := &dropCollectionTask{
S
sunby 已提交
236
		ctx:                   ctx,
237 238
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
239
		rootCoord:             node.rootCoord,
240
		chMgr:                 node.chMgr,
S
sunby 已提交
241
		chTicker:              node.chTicker,
242 243
	}

244
	log := log.Ctx(ctx).With(
245
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
246 247
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
248

249 250
	log.Debug("DropCollection received")

251 252
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
253
			zap.Error(err))
254

E
Enwei Jiao 已提交
255
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
256
		return &commonpb.Status{
257
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
258 259 260 261
			Reason:    err.Error(),
		}, nil
	}

262 263
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
264
		zap.Uint64("EndTs", dct.EndTs()))
265 266 267

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
268
			zap.Error(err),
269
			zap.Uint64("BeginTs", dct.BeginTs()),
270
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
271

E
Enwei Jiao 已提交
272
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
273
		return &commonpb.Status{
274
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
275 276 277 278
			Reason:    err.Error(),
		}, nil
	}

279 280
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
281
		zap.Uint64("EndTs", dct.EndTs()))
282

E
Enwei Jiao 已提交
283 284
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
285 286 287
	return dct.result, nil
}

288
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
289
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
290 291 292 293 294
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
295 296 297

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
298 299
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
300
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
301
		metrics.TotalLabel).Inc()
302

303
	log := log.Ctx(ctx).With(
304
		zap.String("role", typeutil.ProxyRole),
305 306 307
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

308 309
	log.Debug("HasCollection received")

310
	hct := &hasCollectionTask{
S
sunby 已提交
311
		ctx:                  ctx,
312 313
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
314
		rootCoord:            node.rootCoord,
315 316
	}

317 318
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
319
			zap.Error(err))
320

E
Enwei Jiao 已提交
321
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
322
			metrics.AbandonLabel).Inc()
323 324
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
325
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
326 327 328 329 330
				Reason:    err.Error(),
			},
		}, nil
	}

331 332
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
333
		zap.Uint64("EndTS", hct.EndTs()))
334 335 336

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
337
			zap.Error(err),
338
			zap.Uint64("BeginTS", hct.BeginTs()),
339
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
340

E
Enwei Jiao 已提交
341
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
342
			metrics.FailLabel).Inc()
343 344
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
345
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
346 347 348 349 350
				Reason:    err.Error(),
			},
		}, nil
	}

351 352
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
353
		zap.Uint64("EndTS", hct.EndTs()))
354

E
Enwei Jiao 已提交
355
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
356
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
357
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
358 359 360
	return hct.result, nil
}

361
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
362
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
363 364 365
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
366 367 368

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
369 370
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
371
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
372
		metrics.TotalLabel).Inc()
373
	lct := &loadCollectionTask{
S
sunby 已提交
374
		ctx:                   ctx,
375 376
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
377
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
378
		indexCoord:            node.indexCoord,
379 380
	}

381
	log := log.Ctx(ctx).With(
382
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
383 384
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
385

386 387
	log.Debug("LoadCollection received")

388 389
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
390
			zap.Error(err))
391

E
Enwei Jiao 已提交
392
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
393
			metrics.AbandonLabel).Inc()
394
		return &commonpb.Status{
395
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
396 397 398
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
399

400 401
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
402
		zap.Uint64("EndTS", lct.EndTs()))
403 404 405

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
406
			zap.Error(err),
407
			zap.Uint64("BeginTS", lct.BeginTs()),
408
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
409
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
410
			metrics.FailLabel).Inc()
411
		return &commonpb.Status{
412
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
413 414 415 416
			Reason:    err.Error(),
		}, nil
	}

417 418
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
419
		zap.Uint64("EndTS", lct.EndTs()))
420

E
Enwei Jiao 已提交
421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
424
	return lct.result, nil
425 426
}

427
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
428
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
429 430 431
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
432

433
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
434
	defer sp.Finish()
435 436
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
437
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
438
		metrics.TotalLabel).Inc()
439
	rct := &releaseCollectionTask{
S
sunby 已提交
440
		ctx:                      ctx,
441 442
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
443
		queryCoord:               node.queryCoord,
444
		chMgr:                    node.chMgr,
445 446
	}

447
	log := log.Ctx(ctx).With(
448
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
449 450
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
451

452 453
	log.Debug(rpcReceived(method))

454
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
455 456
		log.Warn(
			rpcFailedToEnqueue(method),
457
			zap.Error(err))
458

E
Enwei Jiao 已提交
459
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
460
			metrics.AbandonLabel).Inc()
461
		return &commonpb.Status{
462
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
463 464 465 466
			Reason:    err.Error(),
		}, nil
	}

467 468
	log.Debug(
		rpcEnqueued(method),
469
		zap.Uint64("BeginTS", rct.BeginTs()),
470
		zap.Uint64("EndTS", rct.EndTs()))
471 472

	if err := rct.WaitToFinish(); err != nil {
473 474
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
475
			zap.Error(err),
476
			zap.Uint64("BeginTS", rct.BeginTs()),
477
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
478

E
Enwei Jiao 已提交
479
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
480
			metrics.FailLabel).Inc()
481
		return &commonpb.Status{
482
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
483 484 485 486
			Reason:    err.Error(),
		}, nil
	}

487 488
	log.Debug(
		rpcDone(method),
489
		zap.Uint64("BeginTS", rct.BeginTs()),
490
		zap.Uint64("EndTS", rct.EndTs()))
491

E
Enwei Jiao 已提交
492
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
493
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
494
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
495
	return rct.result, nil
496 497
}

498
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
499
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
500 501 502 503 504
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
505

506
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
507
	defer sp.Finish()
508 509
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
510
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
511
		metrics.TotalLabel).Inc()
512

513
	dct := &describeCollectionTask{
S
sunby 已提交
514
		ctx:                       ctx,
515 516
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
517
		rootCoord:                 node.rootCoord,
518 519
	}

520
	log := log.Ctx(ctx).With(
521
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
522 523
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
524

525 526
	log.Debug("DescribeCollection received")

527 528
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
529
			zap.Error(err))
530

E
Enwei Jiao 已提交
531
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
532
			metrics.AbandonLabel).Inc()
533 534
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
535
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
536 537 538 539 540
				Reason:    err.Error(),
			},
		}, nil
	}

541 542
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
543
		zap.Uint64("EndTS", dct.EndTs()))
544 545 546

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
547
			zap.Error(err),
548
			zap.Uint64("BeginTS", dct.BeginTs()),
549
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
550

E
Enwei Jiao 已提交
551
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
552
			metrics.FailLabel).Inc()
553

554 555
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
556
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
557 558 559 560 561
				Reason:    err.Error(),
			},
		}, nil
	}

562 563
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
564
		zap.Uint64("EndTS", dct.EndTs()))
565

E
Enwei Jiao 已提交
566
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
567
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
568
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
569 570 571
	return dct.result, nil
}

572 573 574 575 576 577 578 579 580
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

581
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetStatistics")
582 583 584
	defer sp.Finish()
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
585
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
586
		metrics.TotalLabel).Inc()
587 588 589 590 591 592 593 594 595 596
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

597
	log := log.Ctx(ctx).With(
598 599
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
600 601 602 603
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
604 605 606 607 608 609 610 611
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
612
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
637
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
638 639 640 641 642 643 644 645 646 647 648 649 650
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
651
		zap.Uint64("EndTS", g.EndTs()))
652

E
Enwei Jiao 已提交
653
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
654
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
655
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
656 657 658
	return g.result, nil
}

659
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
660
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
661 662 663 664 665
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
666 667 668

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
669 670
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
671
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
672
		metrics.TotalLabel).Inc()
673
	g := &getCollectionStatisticsTask{
G
godchen 已提交
674 675 676
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
677
		dataCoord:                      node.dataCoord,
678 679
	}

680
	log := log.Ctx(ctx).With(
681
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
682 683
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
684

685 686
	log.Debug(rpcReceived(method))

687
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
688 689
		log.Warn(
			rpcFailedToEnqueue(method),
690
			zap.Error(err))
691

E
Enwei Jiao 已提交
692
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
693
			metrics.AbandonLabel).Inc()
694

G
godchen 已提交
695
		return &milvuspb.GetCollectionStatisticsResponse{
696
			Status: &commonpb.Status{
697
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
698 699 700 701 702
				Reason:    err.Error(),
			},
		}, nil
	}

703 704
	log.Debug(
		rpcEnqueued(method),
705
		zap.Uint64("BeginTS", g.BeginTs()),
706
		zap.Uint64("EndTS", g.EndTs()))
707 708

	if err := g.WaitToFinish(); err != nil {
709 710
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
711
			zap.Error(err),
712
			zap.Uint64("BeginTS", g.BeginTs()),
713
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
714

E
Enwei Jiao 已提交
715
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
716
			metrics.FailLabel).Inc()
717

G
godchen 已提交
718
		return &milvuspb.GetCollectionStatisticsResponse{
719
			Status: &commonpb.Status{
720
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
721 722 723 724 725
				Reason:    err.Error(),
			},
		}, nil
	}

726 727
	log.Debug(
		rpcDone(method),
728
		zap.Uint64("BeginTS", g.BeginTs()),
729
		zap.Uint64("EndTS", g.EndTs()))
730

E
Enwei Jiao 已提交
731
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
732
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
733
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
734
	return g.result, nil
735 736
}

737
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
738
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
739 740 741 742 743
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
744 745
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowCollections")
	defer sp.Finish()
746 747
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
748
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
749

750
	sct := &showCollectionsTask{
G
godchen 已提交
751 752 753
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
754
		queryCoord:             node.queryCoord,
755
		rootCoord:              node.rootCoord,
756 757
	}

758
	log := log.Ctx(ctx).With(
759
		zap.String("role", typeutil.ProxyRole),
760 761
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
762 763 764 765
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
766

767
	err := node.sched.ddQueue.Enqueue(sct)
768
	if err != nil {
769 770
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
771
			zap.Any("CollectionNames", request.CollectionNames))
772

E
Enwei Jiao 已提交
773
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
774
		return &milvuspb.ShowCollectionsResponse{
775
			Status: &commonpb.Status{
776
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
777 778 779 780 781
				Reason:    err.Error(),
			},
		}, nil
	}

782
	log.Debug("ShowCollections enqueued",
783
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
784

785 786
	err = sct.WaitToFinish()
	if err != nil {
787 788
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
789
			zap.Any("CollectionNames", request.CollectionNames))
790

E
Enwei Jiao 已提交
791
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
792

G
godchen 已提交
793
		return &milvuspb.ShowCollectionsResponse{
794
			Status: &commonpb.Status{
795
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
796 797 798 799 800
				Reason:    err.Error(),
			},
		}, nil
	}

801
	log.Debug("ShowCollections Done",
802 803
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
804

E
Enwei Jiao 已提交
805 806
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
807 808 809
	return sct.result, nil
}

J
jaime 已提交
810 811 812 813 814 815 816 817 818 819
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterCollection")
	defer sp.Finish()
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
820
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
821 822 823 824 825 826 827 828

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

829
	log := log.Ctx(ctx).With(
J
jaime 已提交
830 831 832 833
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

834 835 836
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
837 838 839
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
840
			zap.Error(err))
J
jaime 已提交
841

E
Enwei Jiao 已提交
842
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
843 844 845 846 847 848 849 850 851 852
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
853
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
854 855 856 857 858 859

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
860
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
861

E
Enwei Jiao 已提交
862
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
863 864 865 866 867 868 869 870 871
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
872
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
873

E
Enwei Jiao 已提交
874 875
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
876 877 878
	return act.result, nil
}

879
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
880
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
881 882 883
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
884

885
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
886
	defer sp.Finish()
887 888
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
889
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
890

891
	cpt := &createPartitionTask{
S
sunby 已提交
892
		ctx:                    ctx,
893 894
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
895
		rootCoord:              node.rootCoord,
896 897 898
		result:                 nil,
	}

899
	log := log.Ctx(ctx).With(
900
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
901 902 903
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
904

905 906
	log.Debug(rpcReceived("CreatePartition"))

907 908 909
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
910
			zap.Error(err))
911

E
Enwei Jiao 已提交
912
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
913

914
		return &commonpb.Status{
915
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
916 917 918
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
919

920 921 922
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
923
		zap.Uint64("EndTS", cpt.EndTs()))
924 925 926 927

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
928
			zap.Error(err),
929
			zap.Uint64("BeginTS", cpt.BeginTs()),
930
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
931

E
Enwei Jiao 已提交
932
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
933

934
		return &commonpb.Status{
935
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
936 937 938
			Reason:    err.Error(),
		}, nil
	}
939 940 941 942

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
943
		zap.Uint64("EndTS", cpt.EndTs()))
944

E
Enwei Jiao 已提交
945 946
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
947 948 949
	return cpt.result, nil
}

950
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
951
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
952 953 954
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
955

956
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
957
	defer sp.Finish()
958 959
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
960
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
961

962
	dpt := &dropPartitionTask{
S
sunby 已提交
963
		ctx:                  ctx,
964 965
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
966
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
967
		queryCoord:           node.queryCoord,
968 969 970
		result:               nil,
	}

971
	log := log.Ctx(ctx).With(
972
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
973 974 975
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
976

977 978
	log.Debug(rpcReceived(method))

979 980 981
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
982
			zap.Error(err))
983

E
Enwei Jiao 已提交
984
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
985

986
		return &commonpb.Status{
987
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
988 989 990
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
991

992 993 994
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
995
		zap.Uint64("EndTS", dpt.EndTs()))
996 997 998 999

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1000
			zap.Error(err),
1001
			zap.Uint64("BeginTS", dpt.BeginTs()),
1002
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1003

E
Enwei Jiao 已提交
1004
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1005

1006
		return &commonpb.Status{
1007
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1008 1009 1010
			Reason:    err.Error(),
		}, nil
	}
1011 1012 1013 1014

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1015
		zap.Uint64("EndTS", dpt.EndTs()))
1016

E
Enwei Jiao 已提交
1017 1018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1019 1020 1021
	return dpt.result, nil
}

1022
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1023
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1024 1025 1026 1027 1028
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1029

D
dragondriver 已提交
1030
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1031
	defer sp.Finish()
1032 1033 1034
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1035
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1036
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1037

1038
	hpt := &hasPartitionTask{
S
sunby 已提交
1039
		ctx:                 ctx,
1040 1041
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1042
		rootCoord:           node.rootCoord,
1043 1044 1045
		result:              nil,
	}

1046
	log := log.Ctx(ctx).With(
1047
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1048 1049 1050
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1051

1052 1053
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1054 1055 1056
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1057
			zap.Error(err))
D
dragondriver 已提交
1058

E
Enwei Jiao 已提交
1059
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1060
			metrics.AbandonLabel).Inc()
1061

1062 1063
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1064
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1065 1066 1067 1068 1069
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1070

D
dragondriver 已提交
1071 1072 1073
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1074
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1075 1076 1077 1078

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1079
			zap.Error(err),
D
dragondriver 已提交
1080
			zap.Uint64("BeginTS", hpt.BeginTs()),
1081
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1082

E
Enwei Jiao 已提交
1083
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1084
			metrics.FailLabel).Inc()
1085

1086 1087
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1088
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1089 1090 1091 1092 1093
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1094 1095 1096 1097

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1098
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1099

E
Enwei Jiao 已提交
1100
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1101
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1102
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1103 1104 1105
	return hpt.result, nil
}

1106
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1107
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1108 1109 1110
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1111

D
dragondriver 已提交
1112
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1113
	defer sp.Finish()
1114 1115
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1116
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1117
		metrics.TotalLabel).Inc()
1118
	lpt := &loadPartitionsTask{
G
godchen 已提交
1119 1120 1121
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1122
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
1123
		indexCoord:            node.indexCoord,
1124 1125
	}

1126
	log := log.Ctx(ctx).With(
1127
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1128 1129 1130
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1131

1132 1133
	log.Debug(rpcReceived(method))

1134 1135 1136
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1137
			zap.Error(err))
1138

E
Enwei Jiao 已提交
1139
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1140
			metrics.AbandonLabel).Inc()
1141

1142
		return &commonpb.Status{
1143
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1144 1145 1146 1147
			Reason:    err.Error(),
		}, nil
	}

1148 1149 1150
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1151
		zap.Uint64("EndTS", lpt.EndTs()))
1152 1153 1154 1155

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1156
			zap.Error(err),
1157
			zap.Uint64("BeginTS", lpt.BeginTs()),
1158
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1159

E
Enwei Jiao 已提交
1160
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1161
			metrics.FailLabel).Inc()
1162

1163
		return &commonpb.Status{
1164
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1165 1166 1167 1168
			Reason:    err.Error(),
		}, nil
	}

1169 1170 1171
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1172
		zap.Uint64("EndTS", lpt.EndTs()))
1173

E
Enwei Jiao 已提交
1174
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1175
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1176
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1177
	return lpt.result, nil
1178 1179
}

1180
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1181
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1182 1183 1184
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1185 1186 1187 1188

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()

1189
	rpt := &releasePartitionsTask{
G
godchen 已提交
1190 1191 1192
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1193
		queryCoord:               node.queryCoord,
1194 1195
	}

1196
	method := "ReleasePartitions"
1197
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1198
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1199
		metrics.TotalLabel).Inc()
1200 1201

	log := log.Ctx(ctx).With(
1202
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1203 1204 1205
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1206

1207 1208
	log.Debug(rpcReceived(method))

1209 1210 1211
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1212
			zap.Error(err))
1213

E
Enwei Jiao 已提交
1214
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1215
			metrics.AbandonLabel).Inc()
1216

1217
		return &commonpb.Status{
1218
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1219 1220 1221 1222
			Reason:    err.Error(),
		}, nil
	}

1223 1224 1225
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1226
		zap.Uint64("EndTS", rpt.EndTs()))
1227 1228 1229 1230

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1231
			zap.Error(err),
1232
			zap.Uint64("BeginTS", rpt.BeginTs()),
1233
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1234

E
Enwei Jiao 已提交
1235
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1236
			metrics.FailLabel).Inc()
1237

1238
		return &commonpb.Status{
1239
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1240 1241 1242 1243
			Reason:    err.Error(),
		}, nil
	}

1244 1245 1246
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1247
		zap.Uint64("EndTS", rpt.EndTs()))
1248

E
Enwei Jiao 已提交
1249
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1250
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1251
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1252
	return rpt.result, nil
1253 1254
}

1255
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1256
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1257 1258 1259 1260 1261
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1262 1263 1264

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
1265 1266
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1267
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1268
		metrics.TotalLabel).Inc()
1269

1270
	g := &getPartitionStatisticsTask{
1271 1272 1273
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1274
		dataCoord:                     node.dataCoord,
1275 1276
	}

1277
	log := log.Ctx(ctx).With(
1278
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1279 1280 1281
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1282

1283 1284
	log.Debug(rpcReceived(method))

1285 1286 1287
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1288
			zap.Error(err))
1289

E
Enwei Jiao 已提交
1290
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1291
			metrics.AbandonLabel).Inc()
1292

1293 1294 1295 1296 1297 1298 1299 1300
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1301 1302 1303
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1304
		zap.Uint64("EndTS", g.EndTs()))
1305 1306 1307 1308

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1309
			zap.Error(err),
1310
			zap.Uint64("BeginTS", g.BeginTs()),
1311
			zap.Uint64("EndTS", g.EndTs()))
1312

E
Enwei Jiao 已提交
1313
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1314
			metrics.FailLabel).Inc()
1315

1316 1317 1318 1319 1320 1321 1322 1323
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1324 1325 1326
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1327
		zap.Uint64("EndTS", g.EndTs()))
1328

E
Enwei Jiao 已提交
1329
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1330
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1331
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1332
	return g.result, nil
1333 1334
}

1335
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1336
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1337 1338 1339 1340 1341
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1342 1343 1344 1345

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()

1346
	spt := &showPartitionsTask{
G
godchen 已提交
1347 1348 1349
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1350
		rootCoord:             node.rootCoord,
1351
		queryCoord:            node.queryCoord,
G
godchen 已提交
1352
		result:                nil,
1353 1354
	}

1355
	method := "ShowPartitions"
1356 1357
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1358
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1359
		metrics.TotalLabel).Inc()
1360

1361 1362
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1363 1364
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1365
		zap.Any("request", request))
1366 1367 1368 1369 1370 1371 1372

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1373
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1374
			metrics.AbandonLabel).Inc()
1375

G
godchen 已提交
1376
		return &milvuspb.ShowPartitionsResponse{
1377
			Status: &commonpb.Status{
1378
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1379 1380 1381 1382 1383
				Reason:    err.Error(),
			},
		}, nil
	}

1384 1385 1386 1387
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1388 1389
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1390 1391 1392 1393 1394
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1395
			zap.Error(err),
1396 1397 1398 1399 1400
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1401

E
Enwei Jiao 已提交
1402
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1403
			metrics.FailLabel).Inc()
1404

G
godchen 已提交
1405
		return &milvuspb.ShowPartitionsResponse{
1406
			Status: &commonpb.Status{
1407
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1408 1409 1410 1411
				Reason:    err.Error(),
			},
		}, nil
	}
1412 1413 1414 1415 1416 1417 1418 1419 1420

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1421
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1422
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1423
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1424 1425 1426
	return spt.result, nil
}

S
SimFG 已提交
1427 1428
func (node *Proxy) getCollectionProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest, collectionID int64) (int64, error) {
	resp, err := node.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
S
smellthemoon 已提交
1429
		Base: commonpbutil.UpdateMsgBase(
1430 1431 1432
			request.Base,
			commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
		),
S
SimFG 已提交
1433 1434 1435 1436 1437
		CollectionIDs: []int64{collectionID},
	})
	if err != nil {
		return 0, err
	}
X
xige-16 已提交
1438 1439 1440 1441 1442

	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
		return 0, errors.New(resp.Status.Reason)
	}

S
SimFG 已提交
1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460
	if len(resp.InMemoryPercentages) == 0 {
		return 0, errors.New("fail to show collections from the querycoord, no data")
	}
	return resp.InMemoryPercentages[0], nil
}

func (node *Proxy) getPartitionProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest, collectionID int64) (int64, error) {
	IDs2Names := make(map[int64]string)
	partitionIDs := make([]int64, 0)
	for _, partitionName := range request.PartitionNames {
		partitionID, err := globalMetaCache.GetPartitionID(ctx, request.CollectionName, partitionName)
		if err != nil {
			return 0, err
		}
		IDs2Names[partitionID] = partitionName
		partitionIDs = append(partitionIDs, partitionID)
	}
	resp, err := node.queryCoord.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
S
smellthemoon 已提交
1461
		Base: commonpbutil.UpdateMsgBase(
1462 1463 1464
			request.Base,
			commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
		),
S
SimFG 已提交
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487
		CollectionID: collectionID,
		PartitionIDs: partitionIDs,
	})
	if err != nil {
		return 0, err
	}
	if len(resp.InMemoryPercentages) != len(partitionIDs) {
		return 0, errors.New("fail to show partitions from the querycoord, invalid data num")
	}
	var progress int64
	for _, p := range resp.InMemoryPercentages {
		progress += p
	}
	progress /= int64(len(partitionIDs))
	return progress, nil
}

func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
1488
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadingProgress")
S
SimFG 已提交
1489
	defer sp.Finish()
E
Enwei Jiao 已提交
1490
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1491 1492 1493
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1494 1495 1496 1497
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1498
		log.Warn("fail to get loading progress",
1499 1500 1501
			zap.Error(err),
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames))
E
Enwei Jiao 已提交
1502
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
S
SimFG 已提交
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
1517 1518 1519
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1520
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1521
	)
S
SimFG 已提交
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = node.getCollectionProgress(ctx, request, collectionID); err != nil {
			return getErrResponse(err), nil
		}
	} else {
		if progress, err = node.getPartitionProgress(ctx, request, collectionID); err != nil {
			return getErrResponse(err), nil
		}
	}

1541
	log.Debug(
S
SimFG 已提交
1542 1543
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1544 1545
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1546 1547 1548 1549 1550 1551 1552 1553
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1554
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1555
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1556 1557 1558
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1559

1560
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateIndex")
D
dragondriver 已提交
1561 1562
	defer sp.Finish()

1563
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1564 1565 1566 1567 1568
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
		indexCoord: node.indexCoord,
1569
		queryCoord: node.queryCoord,
1570 1571
	}

D
dragondriver 已提交
1572
	method := "CreateIndex"
1573
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1574
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1575
		metrics.TotalLabel).Inc()
1576 1577

	log := log.Ctx(ctx).With(
1578
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1579 1580 1581 1582
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1583

1584 1585
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1586 1587 1588
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1589
			zap.Error(err))
D
dragondriver 已提交
1590

E
Enwei Jiao 已提交
1591
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1592
			metrics.AbandonLabel).Inc()
1593

1594
		return &commonpb.Status{
1595
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1596 1597 1598 1599
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1600 1601 1602
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1603
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1604 1605 1606 1607

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1608
			zap.Error(err),
D
dragondriver 已提交
1609
			zap.Uint64("BeginTs", cit.BeginTs()),
1610
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1611

E
Enwei Jiao 已提交
1612
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1613
			metrics.FailLabel).Inc()
1614

1615
		return &commonpb.Status{
1616
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1617 1618 1619 1620
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1621 1622 1623
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1624
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1625

E
Enwei Jiao 已提交
1626
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1627
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1628
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1629 1630 1631
	return cit.result, nil
}

1632
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1633
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1634 1635 1636 1637 1638
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1639 1640 1641 1642

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()

1643
	dit := &describeIndexTask{
S
sunby 已提交
1644
		ctx:                  ctx,
1645 1646
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1647
		indexCoord:           node.indexCoord,
1648 1649
	}

1650 1651
	method := "DescribeIndex"
	// avoid data race
1652
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1653
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1654
		metrics.TotalLabel).Inc()
1655 1656

	log := log.Ctx(ctx).With(
1657
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1658 1659 1660
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1661 1662 1663
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1664 1665 1666 1667

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1668
			zap.Error(err))
1669

E
Enwei Jiao 已提交
1670
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1671
			metrics.AbandonLabel).Inc()
1672

1673 1674
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1675
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1676 1677 1678 1679 1680
				Reason:    err.Error(),
			},
		}, nil
	}

1681 1682 1683
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1684
		zap.Uint64("EndTs", dit.EndTs()))
1685 1686 1687 1688

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1689
			zap.Error(err),
1690
			zap.Uint64("BeginTs", dit.BeginTs()),
1691
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1692

Z
zhenshan.cao 已提交
1693 1694 1695 1696
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1697
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1698
			metrics.FailLabel).Inc()
1699

1700 1701
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1702
				ErrorCode: errCode,
1703 1704 1705 1706 1707
				Reason:    err.Error(),
			},
		}, nil
	}

1708 1709 1710
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1711
		zap.Uint64("EndTs", dit.EndTs()))
1712

E
Enwei Jiao 已提交
1713
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1714
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1715
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1716 1717 1718
	return dit.result, nil
}

1719
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1720
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1721 1722 1723
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1724 1725 1726 1727

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()

1728
	dit := &dropIndexTask{
S
sunby 已提交
1729
		ctx:              ctx,
B
BossZou 已提交
1730 1731
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1732
		indexCoord:       node.indexCoord,
1733
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1734
	}
G
godchen 已提交
1735

D
dragondriver 已提交
1736
	method := "DropIndex"
1737
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1738
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1739
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1740

1741
	log := log.Ctx(ctx).With(
1742
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1743 1744 1745 1746 1747
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1748 1749
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1750 1751 1752
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1753
			zap.Error(err))
E
Enwei Jiao 已提交
1754
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1755
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1756

B
BossZou 已提交
1757
		return &commonpb.Status{
1758
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1759 1760 1761
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1762

D
dragondriver 已提交
1763 1764 1765
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1766
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1767 1768 1769 1770

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1771
			zap.Error(err),
D
dragondriver 已提交
1772
			zap.Uint64("BeginTs", dit.BeginTs()),
1773
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1774

E
Enwei Jiao 已提交
1775
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1776
			metrics.FailLabel).Inc()
1777

B
BossZou 已提交
1778
		return &commonpb.Status{
1779
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1780 1781 1782
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1783 1784 1785 1786

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1787
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1788

E
Enwei Jiao 已提交
1789
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1790
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1791
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1792 1793 1794
	return dit.result, nil
}

1795 1796
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1797
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1798
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1799 1800 1801 1802 1803
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1804 1805 1806 1807

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()

1808
	gibpt := &getIndexBuildProgressTask{
1809 1810 1811
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1812 1813
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1814
		dataCoord:                    node.dataCoord,
1815 1816
	}

1817
	method := "GetIndexBuildProgress"
1818
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1819
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1820
		metrics.TotalLabel).Inc()
1821 1822

	log := log.Ctx(ctx).With(
1823
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1824 1825 1826 1827
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1828

1829 1830
	log.Debug(rpcReceived(method))

1831 1832 1833
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1834
			zap.Error(err))
E
Enwei Jiao 已提交
1835
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1836
			metrics.AbandonLabel).Inc()
1837

1838 1839 1840 1841 1842 1843 1844 1845
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1846 1847 1848
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1849
		zap.Uint64("EndTs", gibpt.EndTs()))
1850 1851 1852 1853

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1854
			zap.Error(err),
1855
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1856
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1857
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1858
			metrics.FailLabel).Inc()
1859 1860 1861 1862 1863 1864 1865 1866

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1867 1868 1869 1870

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1871
		zap.Uint64("EndTs", gibpt.EndTs()))
1872

E
Enwei Jiao 已提交
1873
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1874
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1875
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1876
	return gibpt.result, nil
1877 1878
}

1879
// GetIndexState get the build-state of index.
1880
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1881
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1882 1883 1884 1885 1886
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1887 1888 1889 1890

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()

1891
	dipt := &getIndexStateTask{
G
godchen 已提交
1892 1893 1894
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1895 1896
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
1897 1898
	}

1899
	method := "GetIndexState"
1900
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1901
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1902
		metrics.TotalLabel).Inc()
1903 1904

	log := log.Ctx(ctx).With(
1905
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1906 1907 1908 1909
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1910

1911 1912
	log.Debug(rpcReceived(method))

1913 1914 1915
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1916
			zap.Error(err))
1917

E
Enwei Jiao 已提交
1918
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1919
			metrics.AbandonLabel).Inc()
1920

G
godchen 已提交
1921
		return &milvuspb.GetIndexStateResponse{
1922
			Status: &commonpb.Status{
1923
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1924 1925 1926 1927 1928
				Reason:    err.Error(),
			},
		}, nil
	}

1929 1930 1931
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1932
		zap.Uint64("EndTs", dipt.EndTs()))
1933 1934 1935 1936

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1937
			zap.Error(err),
1938
			zap.Uint64("BeginTs", dipt.BeginTs()),
1939
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
1940
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1941
			metrics.FailLabel).Inc()
1942

G
godchen 已提交
1943
		return &milvuspb.GetIndexStateResponse{
1944
			Status: &commonpb.Status{
1945
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1946 1947 1948 1949 1950
				Reason:    err.Error(),
			},
		}, nil
	}

1951 1952 1953
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1954
		zap.Uint64("EndTs", dipt.EndTs()))
1955

E
Enwei Jiao 已提交
1956
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1957
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1958
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1959 1960 1961
	return dipt.result, nil
}

1962
// Insert insert records into collection.
C
Cai Yudong 已提交
1963
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
1964 1965
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
1966 1967 1968
	log := log.Ctx(ctx)
	log.Debug("Start processing insert request in Proxy")
	defer log.Debug("Finish processing insert request in Proxy")
X
Xiangyu Wang 已提交
1969

1970 1971 1972 1973 1974
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
1975 1976
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
1977
	receiveSize := proto.Size(request)
1978
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))
E
Enwei Jiao 已提交
1979
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(receiveSize))
D
dragondriver 已提交
1980

E
Enwei Jiao 已提交
1981
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1982
	it := &insertTask{
1983 1984
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
1985
		// req:       request,
1986 1987 1988 1989
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
1990
			InsertRequest: internalpb.InsertRequest{
1991 1992 1993
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1994
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
1995
				),
1996 1997
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
1998 1999 2000
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2001
				// RowData: transfer column based request to this
2002 2003
			},
		},
2004
		idAllocator:   node.rowIDAllocator,
2005 2006 2007
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2008
	}
2009 2010

	if len(it.PartitionName) <= 0 {
2011
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2012 2013
	}

X
Xiangyu Wang 已提交
2014
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2015
		numRows := request.NumRows
2016 2017 2018 2019
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2020

X
Xiangyu Wang 已提交
2021 2022 2023 2024 2025 2026 2027
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2028 2029
	}

X
Xiangyu Wang 已提交
2030
	log.Debug("Enqueue insert request in Proxy",
2031
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2032 2033 2034 2035 2036
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2037
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2038

X
Xiangyu Wang 已提交
2039
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2040
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2041
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2042
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2043
		return constructFailedResponse(err), nil
2044
	}
D
dragondriver 已提交
2045

X
Xiangyu Wang 已提交
2046
	log.Debug("Detail of insert request in Proxy",
2047
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2048
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2049 2050 2051 2052 2053
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2054
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2055 2056

	if err := it.WaitToFinish(); err != nil {
2057
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2058
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2059
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2060 2061 2062 2063 2064
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2065
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2077
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2078

E
Enwei Jiao 已提交
2079
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2080
		metrics.SuccessLabel).Inc()
2081
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2082 2083 2084
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2085 2086 2087
	return it.result, nil
}

2088
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2089
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2090 2091
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
2092 2093 2094
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2095

2096
	receiveSize := proto.Size(request)
2097
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
E
Enwei Jiao 已提交
2098
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(receiveSize))
2099

G
groot 已提交
2100 2101 2102 2103 2104 2105
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2106 2107 2108
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2109
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2110
		metrics.TotalLabel).Inc()
2111
	dt := &deleteTask{
X
xige-16 已提交
2112 2113 2114
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
G
godchen 已提交
2115
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2116 2117 2118
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2119
			DeleteRequest: internalpb.DeleteRequest{
2120 2121 2122 2123
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2124
				DbName:         request.DbName,
G
godchen 已提交
2125 2126 2127
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2128 2129 2130 2131
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2132 2133
	}

2134
	log.Debug("Enqueue delete request in Proxy",
2135
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2136 2137 2138 2139
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2140 2141 2142

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2143
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2144
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2145
			metrics.AbandonLabel).Inc()
2146

G
groot 已提交
2147 2148 2149 2150 2151 2152 2153 2154
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2155
	log.Debug("Detail of delete request in Proxy",
2156
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2157 2158 2159 2160
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2161
		zap.String("expr", request.Expr))
G
groot 已提交
2162

2163
	if err := dt.WaitToFinish(); err != nil {
2164
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2165
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2166
			metrics.FailLabel).Inc()
G
groot 已提交
2167 2168 2169 2170 2171 2172 2173 2174
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

E
Enwei Jiao 已提交
2175
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2176
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2177 2178
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2179 2180 2181
	return dt.result, nil
}

2182
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2183
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2184
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2185
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2186 2187 2188

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2189 2190 2191 2192 2193
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2194 2195
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2196
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2197
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2198

C
cai.zhang 已提交
2199 2200
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2201

2202
	qt := &searchTask{
S
sunby 已提交
2203
		ctx:       ctx,
2204
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2205
		SearchRequest: &internalpb.SearchRequest{
2206 2207
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2208
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2209
			),
E
Enwei Jiao 已提交
2210
			ReqID: paramtable.GetNodeID(),
2211
		},
2212 2213 2214 2215
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2216 2217
	}

2218 2219 2220
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2221
	log := log.Ctx(ctx).With(
2222
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2223 2224 2225 2226 2227
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2228 2229 2230 2231
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2232

2233 2234 2235
	log.Debug(
		rpcReceived(method))

2236
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2237
		log.Warn(
2238
			rpcFailedToEnqueue(method),
2239
			zap.Error(err))
D
dragondriver 已提交
2240

E
Enwei Jiao 已提交
2241
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2242
			metrics.AbandonLabel).Inc()
2243

2244 2245
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2246
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2247 2248 2249 2250
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2251
	tr.CtxRecord(ctx, "search request enqueue")
2252

2253
	log.Debug(
2254
		rpcEnqueued(method),
2255
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2256

2257
	if err := qt.WaitToFinish(); err != nil {
2258
		log.Warn(
2259
			rpcFailedToWaitToFinish(method),
2260
			zap.Error(err))
2261

E
Enwei Jiao 已提交
2262
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2263
			metrics.FailLabel).Inc()
2264

2265 2266
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2267
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2268 2269 2270 2271 2272
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2273
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2274
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2275
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2276
	tr.CtxRecord(ctx, "wait search result")
2277
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2278

E
Enwei Jiao 已提交
2279
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2280
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2281
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2282
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2283
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2284
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2285
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2286
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2287 2288
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2289
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2290
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2291
	}
2292 2293 2294
	return qt.result, nil
}

2295
// Flush notify data nodes to persist the data of collection.
2296 2297 2298 2299 2300 2301 2302
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2303
	if !node.checkHealthy() {
2304 2305
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2306
	}
D
dragondriver 已提交
2307 2308 2309 2310

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()

2311
	ft := &flushTask{
T
ThreadDao 已提交
2312 2313 2314
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2315
		dataCoord:    node.dataCoord,
2316 2317
	}

D
dragondriver 已提交
2318
	method := "Flush"
2319
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2320
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2321

2322
	log := log.Ctx(ctx).With(
2323
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2324 2325
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2326

2327 2328
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2329 2330 2331
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2332
			zap.Error(err))
D
dragondriver 已提交
2333

E
Enwei Jiao 已提交
2334
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2335

2336 2337
		resp.Status.Reason = err.Error()
		return resp, nil
2338 2339
	}

D
dragondriver 已提交
2340 2341 2342
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2343
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2344 2345 2346 2347

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2348
			zap.Error(err),
D
dragondriver 已提交
2349
			zap.Uint64("BeginTs", ft.BeginTs()),
2350
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2351

E
Enwei Jiao 已提交
2352
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2353

D
dragondriver 已提交
2354
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2355 2356
		resp.Status.Reason = err.Error()
		return resp, nil
2357 2358
	}

D
dragondriver 已提交
2359 2360 2361
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2362
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2363

E
Enwei Jiao 已提交
2364 2365
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2366
	return ft.result, nil
2367 2368
}

2369
// Query get the records by primary keys.
C
Cai Yudong 已提交
2370
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2371
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2372
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2373 2374 2375

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2376 2377 2378 2379 2380
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2381

D
dragondriver 已提交
2382 2383
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
2384
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2385

2386
	qt := &queryTask{
2387 2388 2389
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2390 2391
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2392
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2393
			),
E
Enwei Jiao 已提交
2394
			ReqID: paramtable.GetNodeID(),
2395
		},
2396 2397
		request:          request,
		qc:               node.queryCoord,
2398
		queryShardPolicy: mergeRoundRobinPolicy,
2399
		shardMgr:         node.shardMgr,
2400 2401
	}

D
dragondriver 已提交
2402 2403
	method := "Query"

E
Enwei Jiao 已提交
2404
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2405 2406
		metrics.TotalLabel).Inc()

2407
	log := log.Ctx(ctx).With(
2408
		zap.String("role", typeutil.ProxyRole),
2409 2410
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2411 2412 2413 2414
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2415 2416 2417 2418
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2419

D
dragondriver 已提交
2420
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2421
		log.Warn(
D
dragondriver 已提交
2422
			rpcFailedToEnqueue(method),
2423
			zap.Error(err))
D
dragondriver 已提交
2424

E
Enwei Jiao 已提交
2425
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2426
			metrics.AbandonLabel).Inc()
2427

2428 2429 2430 2431 2432 2433
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2434
	}
Z
Zach 已提交
2435
	tr.CtxRecord(ctx, "query request enqueue")
2436

2437
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2438 2439

	if err := qt.WaitToFinish(); err != nil {
2440
		log.Warn(
D
dragondriver 已提交
2441
			rpcFailedToWaitToFinish(method),
2442
			zap.Error(err))
2443

E
Enwei Jiao 已提交
2444
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2445
			metrics.FailLabel).Inc()
2446

2447 2448 2449 2450 2451 2452 2453
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2454
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2455
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2456
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2457

2458
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2459

E
Enwei Jiao 已提交
2460
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2461 2462
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2463
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2464
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2465
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2466
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2467 2468

	ret := &milvuspb.QueryResults{
2469 2470
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2471 2472
	}
	sentSize := proto.Size(qt.result)
2473
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2474
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2475
	return ret, nil
2476
}
2477

2478
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2479 2480 2481 2482
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2483 2484 2485 2486

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()

Y
Yusup 已提交
2487 2488 2489 2490 2491 2492 2493
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2494
	method := "CreateAlias"
2495
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2496
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2497

2498
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2499 2500 2501 2502 2503
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2504 2505
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2506 2507 2508
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2509
			zap.Error(err))
D
dragondriver 已提交
2510

E
Enwei Jiao 已提交
2511
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2512

Y
Yusup 已提交
2513 2514 2515 2516 2517 2518
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2519 2520 2521
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2522
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2523 2524 2525 2526

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2527
			zap.Error(err),
D
dragondriver 已提交
2528
			zap.Uint64("BeginTs", cat.BeginTs()),
2529
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2530
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2531 2532 2533 2534 2535 2536 2537

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2538 2539 2540
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2541
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2542

E
Enwei Jiao 已提交
2543 2544
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2545 2546 2547
	return cat.result, nil
}

2548
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2549 2550 2551 2552
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2553 2554 2555 2556

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()

Y
Yusup 已提交
2557 2558 2559 2560 2561 2562 2563
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2564
	method := "DropAlias"
2565
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2566
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2567

2568
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2569 2570 2571 2572
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2573 2574
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2575 2576 2577
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2578
			zap.Error(err))
E
Enwei Jiao 已提交
2579
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2580

Y
Yusup 已提交
2581 2582 2583 2584 2585 2586
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2587 2588 2589
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2590
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2591 2592 2593 2594

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2595
			zap.Error(err),
D
dragondriver 已提交
2596
			zap.Uint64("BeginTs", dat.BeginTs()),
2597
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2598

E
Enwei Jiao 已提交
2599
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2600

Y
Yusup 已提交
2601 2602 2603 2604 2605 2606
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2607 2608 2609
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2610
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2611

E
Enwei Jiao 已提交
2612 2613
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2614 2615 2616
	return dat.result, nil
}

2617
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2618 2619 2620 2621
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2622 2623 2624 2625

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()

Y
Yusup 已提交
2626 2627 2628 2629 2630 2631 2632
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2633
	method := "AlterAlias"
2634
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2635
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2636

2637
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2638 2639 2640 2641 2642
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2643 2644
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2645 2646 2647
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2648
			zap.Error(err))
E
Enwei Jiao 已提交
2649
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2650

Y
Yusup 已提交
2651 2652 2653 2654 2655 2656
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2657 2658 2659
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2660
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2661 2662 2663 2664

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2665
			zap.Error(err),
D
dragondriver 已提交
2666
			zap.Uint64("BeginTs", aat.BeginTs()),
2667
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2668

E
Enwei Jiao 已提交
2669
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2670

Y
Yusup 已提交
2671 2672 2673 2674 2675 2676
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2677 2678 2679
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2680
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2681

E
Enwei Jiao 已提交
2682 2683
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2684 2685 2686
	return aat.result, nil
}

2687
// CalcDistance calculates the distances between vectors.
2688
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2689 2690 2691 2692 2693
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2694

2695 2696 2697 2698
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2699 2700
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2701

2702 2703 2704 2705 2706
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2707 2708
		}

2709
		qt := &queryTask{
2710 2711 2712
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2713 2714
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2715
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2716
				),
E
Enwei Jiao 已提交
2717
				ReqID: paramtable.GetNodeID(),
2718
			},
2719 2720 2721 2722
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2723
			queryShardPolicy: mergeRoundRobinPolicy,
2724
			shardMgr:         node.shardMgr,
2725 2726
		}

2727
		log := log.Ctx(ctx).With(
G
groot 已提交
2728 2729
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2730
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2731

2732
		err := node.sched.dqQueue.Enqueue(qt)
2733
		if err != nil {
2734 2735
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2736

2737 2738 2739 2740 2741
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2742
			}, err
2743
		}
2744

2745
		log.Debug("CalcDistance queryTask enqueued")
2746 2747 2748

		err = qt.WaitToFinish()
		if err != nil {
2749 2750
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2751 2752 2753 2754 2755 2756

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2757
			}, err
2758
		}
2759

2760
		log.Debug("CalcDistance queryTask Done")
2761 2762

		return &milvuspb.QueryResults{
2763 2764
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2765 2766 2767
		}, nil
	}

G
groot 已提交
2768 2769 2770 2771
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
2772 2773
	}

G
groot 已提交
2774
	return task.Execute(ctx, request)
2775 2776
}

2777
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2778
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2779 2780
	panic("implement me")
}
X
XuanYang-cn 已提交
2781

2782
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2783
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
2784 2785 2786 2787 2788
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
2789
	log.Debug("GetPersistentSegmentInfo",
2790
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2791 2792 2793
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2794
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2795
		Status: &commonpb.Status{
2796
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2797 2798
		},
	}
2799 2800 2801 2802
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
2803 2804
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2805
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2806
		metrics.TotalLabel).Inc()
2807 2808 2809

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
2810
	if err != nil {
E
Enwei Jiao 已提交
2811
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
2823
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2824
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
2825 2826
		return resp, nil
	}
2827 2828

	// get Segment info
2829
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
2830 2831 2832
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2833
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
2834
		),
2835
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
2836 2837
	})
	if err != nil {
E
Enwei Jiao 已提交
2838
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2839
			metrics.FailLabel).Inc()
2840 2841
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
2842
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
2843 2844
		return resp, nil
	}
2845 2846 2847
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
2848
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
2849
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2850
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
2851 2852 2853 2854 2855 2856
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
2857
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
2858 2859
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
2860
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
2861 2862 2863
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
2864
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2865
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2866
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2867
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
2868 2869 2870 2871
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
2872
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
2873
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
2874 2875 2876 2877 2878
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
2879
	log.Debug("GetQuerySegmentInfo",
2880
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2881 2882 2883
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2884
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
2885
		Status: &commonpb.Status{
2886
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
2887 2888
		},
	}
2889 2890 2891 2892
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
2893

2894 2895
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2896
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2897 2898
		metrics.TotalLabel).Inc()

2899 2900
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
2901
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2902 2903 2904
		resp.Status.Reason = err.Error()
		return resp, nil
	}
2905
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
2906 2907 2908
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2909
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
2910
		),
2911
		CollectionID: collID,
Z
zhenshan.cao 已提交
2912 2913
	})
	if err != nil {
E
Enwei Jiao 已提交
2914
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2915 2916
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
2917 2918 2919
		resp.Status.Reason = err.Error()
		return resp, nil
	}
2920 2921 2922
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
2923
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
2924
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2925 2926
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
2940
			State:        info.SegmentState,
2941
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
2942 2943
		}
	}
2944

E
Enwei Jiao 已提交
2945 2946
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2947
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
2948 2949 2950 2951
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
2952
// Dummy handles dummy request
C
Cai Yudong 已提交
2953
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
2954 2955 2956 2957 2958 2959
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
2960 2961 2962 2963 2964 2965

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Dummy")
	defer sp.Finish()

	log := log.Ctx(ctx)

2966
	if err != nil {
2967 2968
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
2969 2970 2971
		return failedResponse, nil
	}

2972 2973
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
2974
		if err != nil {
2975 2976
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
2977 2978 2979
			return failedResponse, nil
		}

2980
		request := &milvuspb.QueryRequest{
2981 2982 2983
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
2984
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
2985 2986
		}

2987
		_, err = node.Query(ctx, request)
2988
		if err != nil {
2989 2990
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
2991 2992
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
2993 2994 2995 2996 2997 2998

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

2999 3000
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3001 3002
}

J
jingkl 已提交
3003
// RegisterLink registers a link
C
Cai Yudong 已提交
3004
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3005
	code := node.stateCode.Load().(commonpb.StateCode)
3006 3007 3008 3009 3010

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RegisterLink")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3011
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3012
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3013

3014 3015
	log.Debug("RegisterLink")

3016
	if code != commonpb.StateCode_Healthy {
3017 3018 3019
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3020
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3021
				Reason:    "proxy not healthy",
3022 3023 3024
			},
		}, nil
	}
E
Enwei Jiao 已提交
3025
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3026 3027 3028
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3029
			ErrorCode: commonpb.ErrorCode_Success,
3030
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3031 3032 3033
		},
	}, nil
}
3034

3035
// GetMetrics gets the metrics of proxy
3036 3037
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3038 3039 3040 3041 3042
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx)

3043
	log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3044
		zap.Int64("node_id", paramtable.GetNodeID()),
3045 3046 3047 3048
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
E
Enwei Jiao 已提交
3049
			zap.Int64("node_id", paramtable.GetNodeID()),
3050
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3051
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3052 3053 3054 3055

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3056
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3057 3058 3059 3060 3061 3062 3063 3064
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
E
Enwei Jiao 已提交
3065
			zap.Int64("node_id", paramtable.GetNodeID()),
3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

3081 3082 3083
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3084
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3085
	)
3086
	if metricType == metricsinfo.SystemInfoMetrics {
3087 3088 3089 3090 3091 3092 3093
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3094
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3095 3096

		log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3097
			zap.Int64("node_id", paramtable.GetNodeID()),
3098 3099 3100 3101 3102
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3103 3104
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3105
		return metrics, nil
3106 3107
	}

J
Jiquan Long 已提交
3108
	log.Warn("Proxy.GetMetrics failed, request metric type is not implemented yet",
E
Enwei Jiao 已提交
3109
		zap.Int64("node_id", paramtable.GetNodeID()),
3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3122 3123 3124
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3125 3126 3127 3128 3129 3130 3131
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetProxyMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("node_id", paramtable.GetNodeID()),
		zap.String("req", req.Request))

3132 3133
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3134
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3135 3136 3137 3138

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3139
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3157 3158 3159
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3160
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3161
	)
3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		log.Debug("Proxy.GetProxyMetrics",
3178
			zap.String("metric_type", metricType))
3179 3180 3181 3182

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3183
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3184 3185 3186 3187 3188 3189 3190 3191 3192 3193
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3194 3195
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
3196 3197 3198 3199 3200
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadBalance")
	defer sp.Finish()

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3201
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3202
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3203 3204 3205 3206 3207 3208 3209 3210 3211
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3212 3213 3214

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3215
		log.Warn("failed to get collection id",
3216 3217
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3218 3219 3220
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3221
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3222 3223 3224
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3225
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3226
		),
B
bigsheeper 已提交
3227 3228
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3229
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3230
		SealedSegmentIDs: req.SealedSegmentIDs,
3231
		CollectionID:     collectionID,
B
bigsheeper 已提交
3232 3233
	})
	if err != nil {
J
Jiquan Long 已提交
3234
		log.Warn("Failed to LoadBalance from Query Coordinator",
3235 3236
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3237 3238 3239 3240
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3241
		log.Warn("Failed to LoadBalance from Query Coordinator",
3242
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3243 3244 3245
		status.Reason = infoResp.Reason
		return status, nil
	}
3246 3247 3248
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3249 3250 3251 3252
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3253 3254
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
3255 3256 3257 3258 3259 3260 3261 3262
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetReplicas")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3263 3264 3265 3266 3267 3268
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3269 3270
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3271
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3272
	)
3273 3274 3275

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3276 3277
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3278 3279 3280 3281
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3282 3283 3284
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3285 3286 3287
	return resp, nil
}

3288
// GetCompactionState gets the compaction state of multiple segments
3289
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
3290 3291 3292 3293 3294 3295 3296
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionState")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3297 3298 3299 3300 3301 3302 3303
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3304 3305 3306
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3307 3308 3309
	return resp, err
}

3310
// ManualCompaction invokes compaction on specified collection
3311
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
3312 3313 3314 3315 3316 3317 3318
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ManualCompaction")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3319 3320 3321 3322 3323 3324 3325
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3326 3327 3328
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3329 3330 3331
	return resp, err
}

3332
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3333
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
3334 3335 3336 3337 3338 3339 3340
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3341 3342 3343 3344 3345 3346 3347
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3348 3349 3350
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3351 3352 3353
	return resp, err
}

B
Bingyi Sun 已提交
3354 3355
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
3356 3357 3358 3359 3360 3361 3362
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetFlushState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3363
	var err error
B
Bingyi Sun 已提交
3364 3365 3366
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3367
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3368 3369 3370
		return resp, nil
	}

3371
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3372
	if err != nil {
3373 3374
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3375 3376
		return nil, err
	}
3377 3378
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3379 3380 3381
	return resp, err
}

C
Cai Yudong 已提交
3382 3383
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3384 3385
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3386 3387
}

3388 3389 3390
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3391 3392
}

3393
// unhealthyStatus returns the proxy not healthy status
3394 3395 3396
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3397
		Reason:    "proxy not healthy",
3398 3399
	}
}
G
groot 已提交
3400 3401 3402

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3403 3404 3405 3406 3407
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Import")
	defer sp.Finish()

	log := log.Ctx(ctx)

3408 3409
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3410 3411
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3412 3413 3414 3415 3416 3417
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3418 3419 3420 3421
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3422

3423 3424
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3425 3426
		log.Error("failed to execute import request",
			zap.Error(err))
3427 3428 3429 3430 3431
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3432 3433
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3434
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3435 3436
		metrics.TotalLabel).Inc()

3437
	// Call rootCoord to finish import.
3438 3439
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3440
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3441 3442
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3443 3444 3445 3446
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3447

E
Enwei Jiao 已提交
3448 3449
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3450
	return respFromRC, nil
G
groot 已提交
3451 3452
}

3453
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3454
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
3455 3456 3457 3458 3459 3460 3461
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetImportState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3462 3463 3464 3465 3466
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3467 3468
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3469
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3470
		metrics.TotalLabel).Inc()
G
groot 已提交
3471 3472

	resp, err := node.rootCoord.GetImportState(ctx, req)
3473
	if err != nil {
E
Enwei Jiao 已提交
3474
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3475 3476
		log.Error("failed to execute get import state",
			zap.Error(err))
3477 3478 3479 3480 3481
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3482 3483 3484
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3485 3486
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3487
	return resp, nil
G
groot 已提交
3488 3489 3490 3491
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
3492 3493 3494 3495 3496
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListImportTasks")
	defer sp.Finish()

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3497
	log.Debug("received list import tasks request")
G
groot 已提交
3498 3499 3500 3501 3502
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3503 3504
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3505
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3506
		metrics.TotalLabel).Inc()
G
groot 已提交
3507
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3508
	if err != nil {
E
Enwei Jiao 已提交
3509
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3510 3511
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3512 3513
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3514 3515 3516
		return resp, nil
	}

3517 3518 3519
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3520 3521
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3522 3523 3524
	return resp, err
}

3525 3526 3527
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3528 3529 3530 3531 3532

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3533 3534
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3535 3536

	log.Debug("received request to invalidate credential cache")
3537
	if !node.checkHealthy() {
3538
		return unhealthyStatus(), nil
3539
	}
3540 3541 3542 3543 3544

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3545
	log.Debug("complete to invalidate credential cache")
3546 3547 3548 3549 3550 3551 3552 3553 3554 3555

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3556 3557 3558 3559 3560

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3561 3562
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3563 3564

	log.Debug("received request to update credential cache")
3565
	if !node.checkHealthy() {
3566
		return unhealthyStatus(), nil
3567
	}
3568 3569

	credInfo := &internalpb.CredentialInfo{
3570 3571
		Username:       request.Username,
		Sha256Password: request.Password,
3572 3573 3574 3575
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3576
	log.Debug("complete to update credential cache")
3577 3578 3579 3580 3581 3582 3583 3584

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
3585 3586 3587 3588 3589 3590 3591 3592
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3593
	if !node.checkHealthy() {
3594
		return unhealthyStatus(), nil
3595
	}
3596 3597 3598 3599 3600 3601 3602 3603 3604 3605
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3606 3607
		log.Error("decode password fail",
			zap.Error(err))
3608 3609 3610 3611 3612 3613
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3614 3615
		log.Error("illegal password",
			zap.Error(err))
3616 3617 3618 3619 3620 3621 3622
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3623 3624
		log.Error("encrypt password fail",
			zap.Error(err))
3625 3626 3627 3628 3629
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3630

3631 3632 3633
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3634
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3635 3636 3637
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3638 3639
		log.Error("create credential fail",
			zap.Error(err))
3640 3641 3642 3643 3644 3645 3646 3647
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3648
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
3649 3650 3651 3652 3653 3654 3655 3656
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3657
	if !node.checkHealthy() {
3658
		return unhealthyStatus(), nil
3659
	}
C
codeman 已提交
3660 3661
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3662 3663
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3664 3665 3666 3667 3668 3669
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3670
	if err != nil {
3671 3672
		log.Error("decode password fail",
			zap.Error(err))
3673 3674 3675 3676 3677
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3678 3679
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3680 3681
		log.Error("illegal password",
			zap.Error(err))
3682 3683 3684 3685 3686
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3687 3688

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3689 3690 3691 3692 3693 3694 3695
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3696
	if err != nil {
3697 3698
		log.Error("encrypt password fail",
			zap.Error(err))
3699 3700 3701 3702 3703
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3704
	updateCredReq := &internalpb.CredentialInfo{
3705
		Username:          req.Username,
3706
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3707 3708
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3709
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3710
	if err != nil { // for error like conntext timeout etc.
3711 3712
		log.Error("update credential fail",
			zap.Error(err))
3713 3714 3715 3716 3717 3718 3719 3720 3721
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
3722 3723 3724 3725 3726 3727 3728 3729
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DeleteCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3730
	if !node.checkHealthy() {
3731
		return unhealthyStatus(), nil
3732 3733
	}

3734 3735 3736 3737 3738 3739
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3740 3741
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3742 3743
		log.Error("delete credential fail",
			zap.Error(err))
3744 3745 3746 3747 3748 3749 3750 3751 3752
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
3753 3754 3755 3756 3757 3758 3759
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListCredUsers")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3760
	if !node.checkHealthy() {
3761
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3762
	}
3763
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3764 3765 3766
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3767 3768
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3781
		Usernames: resp.Usernames,
3782 3783
	}, nil
}
3784

3785
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
3786 3787 3788 3789 3790 3791 3792
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3793
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3794
		return errorutil.UnhealthyStatus(code), nil
3795 3796 3797 3798 3799 3800 3801 3802 3803 3804
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3805
		}, nil
3806 3807 3808 3809
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
3810 3811
		log.Error("fail to create role",
			zap.Error(err))
3812 3813 3814
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3815
		}, nil
3816 3817
	}
	return result, nil
3818 3819
}

3820
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
3821 3822 3823 3824 3825 3826 3827
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
3828
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3829
		return errorutil.UnhealthyStatus(code), nil
3830 3831 3832 3833 3834
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3835
		}, nil
3836
	}
3837 3838 3839 3840 3841
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
3842
		}, nil
3843
	}
3844 3845
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
3846 3847 3848
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
3849 3850 3851
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3852
		}, nil
3853 3854
	}
	return result, nil
3855 3856
}

3857
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
3858 3859 3860 3861 3862 3863 3864
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperateUserRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
3865
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3866
		return errorutil.UnhealthyStatus(code), nil
3867 3868 3869 3870 3871
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3872
		}, nil
3873 3874 3875 3876 3877
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3878
		}, nil
3879 3880 3881 3882
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
3883 3884
		logger.Error("fail to operate user role",
			zap.Error(err))
3885 3886 3887
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3888
		}, nil
3889 3890
	}
	return result, nil
3891 3892
}

3893
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
3894 3895 3896 3897 3898 3899
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
3900
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3901
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
3902 3903 3904 3905 3906 3907 3908 3909 3910
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
3911
			}, nil
3912 3913 3914 3915 3916
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
3917 3918
		log.Error("fail to select role",
			zap.Error(err))
3919 3920 3921 3922 3923
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
3924
		}, nil
3925 3926
	}
	return result, nil
3927 3928
}

3929
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
3930 3931 3932 3933 3934 3935 3936
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectUser")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
3937
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3938
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
3939 3940 3941 3942 3943 3944 3945 3946 3947
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
3948
			}, nil
3949 3950 3951 3952 3953
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
3954 3955
		log.Error("fail to select user",
			zap.Error(err))
3956 3957 3958 3959 3960
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
3961
		}, nil
3962 3963
	}
	return result, nil
3964 3965
}

3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
3996 3997
}

3998
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
3999 4000 4001 4002 4003 4004 4005
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperatePrivilege")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4006
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4007
		return errorutil.UnhealthyStatus(code), nil
4008 4009 4010 4011 4012
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4013
		}, nil
4014 4015 4016 4017 4018 4019
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4020
		}, nil
4021 4022 4023 4024
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4025 4026
		log.Error("fail to operate privilege",
			zap.Error(err))
4027 4028 4029
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4030
		}, nil
4031 4032
	}
	return result, nil
4033 4034
}

4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
4062 4063 4064 4065 4066 4067 4068
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectGrant")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4069
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4070
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4071 4072 4073 4074 4075 4076 4077 4078
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4079
		}, nil
4080 4081 4082 4083
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4084 4085
		log.Error("fail to select grant",
			zap.Error(err))
4086 4087 4088 4089 4090
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4091
		}, nil
4092 4093 4094 4095 4096
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
4097 4098 4099 4100 4101 4102 4103
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4104 4105 4106 4107 4108 4109 4110 4111 4112 4113
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4114 4115
			log.Error("fail to refresh policy info",
				zap.Error(err))
4116 4117 4118 4119 4120 4121
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4122
	log.Debug("RefreshPrivilegeInfoCache success")
4123 4124 4125 4126

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4127
}
4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4148 4149 4150 4151

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4152 4153 4154 4155
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4156 4157 4158 4159 4160 4161 4162 4163 4164 4165
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

4166 4167 4168 4169 4170
		sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.Finish()

		log := log.Ctx(ctx).With(zap.String("role", role))

4171
		if err != nil {
4172 4173
			log.Warn("check health fail",
				zap.Error(err))
4174 4175 4176 4177 4178
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4179
			log.Warn("check health fail")
4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.indexCoord.CheckHealth(ctx, request)
		return fn("indexcoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4213 4214 4215 4216 4217 4218 4219
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		IsHealthy: true,
	}, nil
4220
}