impl.go 141.6 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24
	"strconv"
25 26 27
	"sync"

	"golang.org/x/sync/errgroup"
28

29
	"github.com/golang/protobuf/proto"
S
SimFG 已提交
30 31
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
S
smellthemoon 已提交
32
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
33
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/log"
35
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
36
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
37 38 39 40
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
41
	"github.com/milvus-io/milvus/internal/util"
42
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
43
	"github.com/milvus-io/milvus/internal/util/crypto"
44
	"github.com/milvus-io/milvus/internal/util/errorutil"
45
	"github.com/milvus-io/milvus/internal/util/importutil"
46 47
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
48
	"github.com/milvus-io/milvus/internal/util/paramtable"
49
	"github.com/milvus-io/milvus/internal/util/timerecord"
50
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
51
	"github.com/milvus-io/milvus/internal/util/typeutil"
52
	"go.uber.org/zap"
53 54
)

55 56
const moduleName = "Proxy"

57
// UpdateStateCode updates the state code of Proxy.
58
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
59
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
60 61
}

62
// GetComponentStates get state of Proxy.
63 64
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
65 66 67 68
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
69
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
70 71 72 73 74 75
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
76
		return stats, nil
G
godchen 已提交
77
	}
78 79 80 81
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
82
	info := &milvuspb.ComponentInfo{
83 84
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
85
		Role:      typeutil.ProxyRole,
G
godchen 已提交
86 87 88 89 90 91
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
92
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
93
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
94 95 96 97 98 99 100 101 102
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

103
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
104
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
105 106 107
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
108
	ctx = logutil.WithModule(ctx, moduleName)
109 110 111
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.Finish()
	log := log.Ctx(ctx).With(
112
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
113
		zap.String("db", request.DbName),
114 115
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
116

117 118
	log.Info("received request to invalidate collection meta cache")

119
	collectionName := request.CollectionName
120
	collectionID := request.CollectionID
X
Xiaofan 已提交
121 122

	var aliasName []string
N
neza2017 已提交
123
	if globalMetaCache != nil {
124 125 126 127
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
128
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
129
		}
N
neza2017 已提交
130
	}
131 132
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
133 134
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
135
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
136
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
137
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
138
		}
139
	}
140
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
141

142
	return &commonpb.Status{
143
		ErrorCode: commonpb.ErrorCode_Success,
144 145
		Reason:    "",
	}, nil
146 147
}

148
// CreateCollection create a collection by the schema.
149
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
150
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
151 152 153
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
154 155 156

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
157 158 159
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
160
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
161

162
	cct := &createCollectionTask{
S
sunby 已提交
163
		ctx:                     ctx,
164 165
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
166
		rootCoord:               node.rootCoord,
167 168
	}

169 170 171
	// avoid data race
	lenOfSchema := len(request.Schema)

172
	log := log.Ctx(ctx).With(
173
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
174 175
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
176
		zap.Int("len(schema)", lenOfSchema),
177 178
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
179

180 181
	log.Debug(rpcReceived(method))

182 183 184
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
185
			zap.Error(err))
186

E
Enwei Jiao 已提交
187
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
188
		return &commonpb.Status{
189
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
190 191 192 193
			Reason:    err.Error(),
		}, nil
	}

194 195
	log.Debug(
		rpcEnqueued(method),
196 197
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
198
		zap.Uint64("timestamp", request.Base.Timestamp))
199

200 201 202
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
203
			zap.Error(err),
204
			zap.Uint64("BeginTs", cct.BeginTs()),
205
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
206

E
Enwei Jiao 已提交
207
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
208
		return &commonpb.Status{
209
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
210 211 212 213
			Reason:    err.Error(),
		}, nil
	}

214 215
	log.Debug(
		rpcDone(method),
216
		zap.Uint64("BeginTs", cct.BeginTs()),
217
		zap.Uint64("EndTs", cct.EndTs()))
218

E
Enwei Jiao 已提交
219 220
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
221 222 223
	return cct.result, nil
}

224
// DropCollection drop a collection.
C
Cai Yudong 已提交
225
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
226 227 228
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
229 230 231

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
232 233
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
234
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
235

236
	dct := &dropCollectionTask{
S
sunby 已提交
237
		ctx:                   ctx,
238 239
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
240
		rootCoord:             node.rootCoord,
241
		chMgr:                 node.chMgr,
S
sunby 已提交
242
		chTicker:              node.chTicker,
243 244
	}

245
	log := log.Ctx(ctx).With(
246
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
247 248
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
249

250 251
	log.Debug("DropCollection received")

252 253
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
254
			zap.Error(err))
255

E
Enwei Jiao 已提交
256
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
257
		return &commonpb.Status{
258
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
259 260 261 262
			Reason:    err.Error(),
		}, nil
	}

263 264
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
265
		zap.Uint64("EndTs", dct.EndTs()))
266 267 268

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
269
			zap.Error(err),
270
			zap.Uint64("BeginTs", dct.BeginTs()),
271
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
272

E
Enwei Jiao 已提交
273
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
274
		return &commonpb.Status{
275
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
276 277 278 279
			Reason:    err.Error(),
		}, nil
	}

280 281
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
282
		zap.Uint64("EndTs", dct.EndTs()))
283

E
Enwei Jiao 已提交
284 285
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
286 287 288
	return dct.result, nil
}

289
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
290
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
291 292 293 294 295
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
296 297 298

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
299 300
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
301
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
302
		metrics.TotalLabel).Inc()
303

304
	log := log.Ctx(ctx).With(
305
		zap.String("role", typeutil.ProxyRole),
306 307 308
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

309 310
	log.Debug("HasCollection received")

311
	hct := &hasCollectionTask{
S
sunby 已提交
312
		ctx:                  ctx,
313 314
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
315
		rootCoord:            node.rootCoord,
316 317
	}

318 319
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
320
			zap.Error(err))
321

E
Enwei Jiao 已提交
322
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
323
			metrics.AbandonLabel).Inc()
324 325
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
326
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
327 328 329 330 331
				Reason:    err.Error(),
			},
		}, nil
	}

332 333
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
334
		zap.Uint64("EndTS", hct.EndTs()))
335 336 337

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
338
			zap.Error(err),
339
			zap.Uint64("BeginTS", hct.BeginTs()),
340
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
341

E
Enwei Jiao 已提交
342
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
343
			metrics.FailLabel).Inc()
344 345
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
346
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
347 348 349 350 351
				Reason:    err.Error(),
			},
		}, nil
	}

352 353
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
354
		zap.Uint64("EndTS", hct.EndTs()))
355

E
Enwei Jiao 已提交
356
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
357
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
358
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
359 360 361
	return hct.result, nil
}

362
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
363
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
364 365 366
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
367 368 369

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
370 371
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
372
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
373
		metrics.TotalLabel).Inc()
374
	lct := &loadCollectionTask{
S
sunby 已提交
375
		ctx:                   ctx,
376 377
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
378
		queryCoord:            node.queryCoord,
379
		datacoord:             node.dataCoord,
380 381
	}

382
	log := log.Ctx(ctx).With(
383
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
384 385
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
386

387 388
	log.Debug("LoadCollection received")

389 390
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
391
			zap.Error(err))
392

E
Enwei Jiao 已提交
393
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
394
			metrics.AbandonLabel).Inc()
395
		return &commonpb.Status{
396
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
397 398 399
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
400

401 402
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
403
		zap.Uint64("EndTS", lct.EndTs()))
404 405 406

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
407
			zap.Error(err),
408
			zap.Uint64("BeginTS", lct.BeginTs()),
409
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
410
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
411
			metrics.FailLabel).Inc()
412
		return &commonpb.Status{
413
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
414 415 416 417
			Reason:    err.Error(),
		}, nil
	}

418 419
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
420
		zap.Uint64("EndTS", lct.EndTs()))
421

E
Enwei Jiao 已提交
422
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
423
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
424
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
425
	return lct.result, nil
426 427
}

428
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
429
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
430 431 432
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
433

434
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
435
	defer sp.Finish()
436 437
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
438
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
439
		metrics.TotalLabel).Inc()
440
	rct := &releaseCollectionTask{
S
sunby 已提交
441
		ctx:                      ctx,
442 443
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
444
		queryCoord:               node.queryCoord,
445
		chMgr:                    node.chMgr,
446 447
	}

448
	log := log.Ctx(ctx).With(
449
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
450 451
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
452

453 454
	log.Debug(rpcReceived(method))

455
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
456 457
		log.Warn(
			rpcFailedToEnqueue(method),
458
			zap.Error(err))
459

E
Enwei Jiao 已提交
460
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
461
			metrics.AbandonLabel).Inc()
462
		return &commonpb.Status{
463
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
464 465 466 467
			Reason:    err.Error(),
		}, nil
	}

468 469
	log.Debug(
		rpcEnqueued(method),
470
		zap.Uint64("BeginTS", rct.BeginTs()),
471
		zap.Uint64("EndTS", rct.EndTs()))
472 473

	if err := rct.WaitToFinish(); err != nil {
474 475
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
476
			zap.Error(err),
477
			zap.Uint64("BeginTS", rct.BeginTs()),
478
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
479

E
Enwei Jiao 已提交
480
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
481
			metrics.FailLabel).Inc()
482
		return &commonpb.Status{
483
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
484 485 486 487
			Reason:    err.Error(),
		}, nil
	}

488 489
	log.Debug(
		rpcDone(method),
490
		zap.Uint64("BeginTS", rct.BeginTs()),
491
		zap.Uint64("EndTS", rct.EndTs()))
492

E
Enwei Jiao 已提交
493
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
494
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
495
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
496
	return rct.result, nil
497 498
}

499
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
500
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
501 502 503 504 505
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
506

507
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
508
	defer sp.Finish()
509 510
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
511
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
512
		metrics.TotalLabel).Inc()
513

514
	dct := &describeCollectionTask{
S
sunby 已提交
515
		ctx:                       ctx,
516 517
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
518
		rootCoord:                 node.rootCoord,
519 520
	}

521
	log := log.Ctx(ctx).With(
522
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
523 524
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
525

526 527
	log.Debug("DescribeCollection received")

528 529
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
530
			zap.Error(err))
531

E
Enwei Jiao 已提交
532
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
533
			metrics.AbandonLabel).Inc()
534 535
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
536
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
537 538 539 540 541
				Reason:    err.Error(),
			},
		}, nil
	}

542 543
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
544
		zap.Uint64("EndTS", dct.EndTs()))
545 546 547

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
548
			zap.Error(err),
549
			zap.Uint64("BeginTS", dct.BeginTs()),
550
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
551

E
Enwei Jiao 已提交
552
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
553
			metrics.FailLabel).Inc()
554

555 556
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
557
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
558 559 560 561 562
				Reason:    err.Error(),
			},
		}, nil
	}

563 564
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
565
		zap.Uint64("EndTS", dct.EndTs()))
566

E
Enwei Jiao 已提交
567
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
568
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
569
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
570 571 572
	return dct.result, nil
}

573 574 575 576 577 578 579 580 581
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

582
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetStatistics")
583 584 585
	defer sp.Finish()
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
586
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
587
		metrics.TotalLabel).Inc()
588 589 590 591 592 593 594 595 596 597
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

598
	log := log.Ctx(ctx).With(
599 600
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
601 602 603 604
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
605 606 607 608 609 610 611 612
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
613
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
638
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
639 640 641 642 643 644 645 646 647 648 649 650 651
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
652
		zap.Uint64("EndTS", g.EndTs()))
653

E
Enwei Jiao 已提交
654
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
655
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
656
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
657 658 659
	return g.result, nil
}

660
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
661
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
662 663 664 665 666
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
667 668 669

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
670 671
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
672
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
673
		metrics.TotalLabel).Inc()
674
	g := &getCollectionStatisticsTask{
G
godchen 已提交
675 676 677
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
678
		dataCoord:                      node.dataCoord,
679 680
	}

681
	log := log.Ctx(ctx).With(
682
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
683 684
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
685

686 687
	log.Debug(rpcReceived(method))

688
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
689 690
		log.Warn(
			rpcFailedToEnqueue(method),
691
			zap.Error(err))
692

E
Enwei Jiao 已提交
693
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
694
			metrics.AbandonLabel).Inc()
695

G
godchen 已提交
696
		return &milvuspb.GetCollectionStatisticsResponse{
697
			Status: &commonpb.Status{
698
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
699 700 701 702 703
				Reason:    err.Error(),
			},
		}, nil
	}

704 705
	log.Debug(
		rpcEnqueued(method),
706
		zap.Uint64("BeginTS", g.BeginTs()),
707
		zap.Uint64("EndTS", g.EndTs()))
708 709

	if err := g.WaitToFinish(); err != nil {
710 711
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
712
			zap.Error(err),
713
			zap.Uint64("BeginTS", g.BeginTs()),
714
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
715

E
Enwei Jiao 已提交
716
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
717
			metrics.FailLabel).Inc()
718

G
godchen 已提交
719
		return &milvuspb.GetCollectionStatisticsResponse{
720
			Status: &commonpb.Status{
721
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
722 723 724 725 726
				Reason:    err.Error(),
			},
		}, nil
	}

727 728
	log.Debug(
		rpcDone(method),
729
		zap.Uint64("BeginTS", g.BeginTs()),
730
		zap.Uint64("EndTS", g.EndTs()))
731

E
Enwei Jiao 已提交
732
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
733
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
734
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
735
	return g.result, nil
736 737
}

738
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
739
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
740 741 742 743 744
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
745 746
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowCollections")
	defer sp.Finish()
747 748
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
749
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
750

751
	sct := &showCollectionsTask{
G
godchen 已提交
752 753 754
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
755
		queryCoord:             node.queryCoord,
756
		rootCoord:              node.rootCoord,
757 758
	}

759
	log := log.Ctx(ctx).With(
760
		zap.String("role", typeutil.ProxyRole),
761 762
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
763 764 765 766
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
767

768
	err := node.sched.ddQueue.Enqueue(sct)
769
	if err != nil {
770 771
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
772
			zap.Any("CollectionNames", request.CollectionNames))
773

E
Enwei Jiao 已提交
774
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
775
		return &milvuspb.ShowCollectionsResponse{
776
			Status: &commonpb.Status{
777
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
778 779 780 781 782
				Reason:    err.Error(),
			},
		}, nil
	}

783
	log.Debug("ShowCollections enqueued",
784
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
785

786 787
	err = sct.WaitToFinish()
	if err != nil {
788 789
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
790
			zap.Any("CollectionNames", request.CollectionNames))
791

E
Enwei Jiao 已提交
792
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
793

G
godchen 已提交
794
		return &milvuspb.ShowCollectionsResponse{
795
			Status: &commonpb.Status{
796
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
797 798 799 800 801
				Reason:    err.Error(),
			},
		}, nil
	}

802
	log.Debug("ShowCollections Done",
803 804
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
805

E
Enwei Jiao 已提交
806 807
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
808 809 810
	return sct.result, nil
}

J
jaime 已提交
811 812 813 814 815 816 817 818 819 820
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterCollection")
	defer sp.Finish()
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
821
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
822 823 824 825 826 827 828 829

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

830
	log := log.Ctx(ctx).With(
J
jaime 已提交
831 832 833 834
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

835 836 837
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
838 839 840
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
841
			zap.Error(err))
J
jaime 已提交
842

E
Enwei Jiao 已提交
843
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
844 845 846 847 848 849 850 851 852 853
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
854
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
855 856 857 858 859 860

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
861
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
862

E
Enwei Jiao 已提交
863
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
864 865 866 867 868 869 870 871 872
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
873
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
874

E
Enwei Jiao 已提交
875 876
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
877 878 879
	return act.result, nil
}

880
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
881
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
882 883 884
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
885

886
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
887
	defer sp.Finish()
888 889
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
890
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
891

892
	cpt := &createPartitionTask{
S
sunby 已提交
893
		ctx:                    ctx,
894 895
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
896
		rootCoord:              node.rootCoord,
897 898 899
		result:                 nil,
	}

900
	log := log.Ctx(ctx).With(
901
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
902 903 904
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
905

906 907
	log.Debug(rpcReceived("CreatePartition"))

908 909 910
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
911
			zap.Error(err))
912

E
Enwei Jiao 已提交
913
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
914

915
		return &commonpb.Status{
916
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
917 918 919
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
920

921 922 923
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
924
		zap.Uint64("EndTS", cpt.EndTs()))
925 926 927 928

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
929
			zap.Error(err),
930
			zap.Uint64("BeginTS", cpt.BeginTs()),
931
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
932

E
Enwei Jiao 已提交
933
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
934

935
		return &commonpb.Status{
936
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
937 938 939
			Reason:    err.Error(),
		}, nil
	}
940 941 942 943

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
944
		zap.Uint64("EndTS", cpt.EndTs()))
945

E
Enwei Jiao 已提交
946 947
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
948 949 950
	return cpt.result, nil
}

951
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
952
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
953 954 955
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
956

957
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
958
	defer sp.Finish()
959 960
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
961
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
962

963
	dpt := &dropPartitionTask{
S
sunby 已提交
964
		ctx:                  ctx,
965 966
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
967
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
968
		queryCoord:           node.queryCoord,
969 970 971
		result:               nil,
	}

972
	log := log.Ctx(ctx).With(
973
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
974 975 976
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
977

978 979
	log.Debug(rpcReceived(method))

980 981 982
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
983
			zap.Error(err))
984

E
Enwei Jiao 已提交
985
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
986

987
		return &commonpb.Status{
988
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
989 990 991
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
992

993 994 995
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
996
		zap.Uint64("EndTS", dpt.EndTs()))
997 998 999 1000

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1001
			zap.Error(err),
1002
			zap.Uint64("BeginTS", dpt.BeginTs()),
1003
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1004

E
Enwei Jiao 已提交
1005
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1006

1007
		return &commonpb.Status{
1008
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1009 1010 1011
			Reason:    err.Error(),
		}, nil
	}
1012 1013 1014 1015

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1016
		zap.Uint64("EndTS", dpt.EndTs()))
1017

E
Enwei Jiao 已提交
1018 1019
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1020 1021 1022
	return dpt.result, nil
}

1023
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1024
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1025 1026 1027 1028 1029
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1030

D
dragondriver 已提交
1031
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1032
	defer sp.Finish()
1033 1034 1035
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1036
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1037
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1038

1039
	hpt := &hasPartitionTask{
S
sunby 已提交
1040
		ctx:                 ctx,
1041 1042
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1043
		rootCoord:           node.rootCoord,
1044 1045 1046
		result:              nil,
	}

1047
	log := log.Ctx(ctx).With(
1048
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1049 1050 1051
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1052

1053 1054
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1055 1056 1057
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1058
			zap.Error(err))
D
dragondriver 已提交
1059

E
Enwei Jiao 已提交
1060
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1061
			metrics.AbandonLabel).Inc()
1062

1063 1064
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1065
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1066 1067 1068 1069 1070
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1071

D
dragondriver 已提交
1072 1073 1074
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1075
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1076 1077 1078 1079

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1080
			zap.Error(err),
D
dragondriver 已提交
1081
			zap.Uint64("BeginTS", hpt.BeginTs()),
1082
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1083

E
Enwei Jiao 已提交
1084
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1085
			metrics.FailLabel).Inc()
1086

1087 1088
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1089
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1090 1091 1092 1093 1094
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1095 1096 1097 1098

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1099
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1100

E
Enwei Jiao 已提交
1101
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1102
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1103
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1104 1105 1106
	return hpt.result, nil
}

1107
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1108
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1109 1110 1111
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1112

D
dragondriver 已提交
1113
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1114
	defer sp.Finish()
1115 1116
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1117
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1118
		metrics.TotalLabel).Inc()
1119
	lpt := &loadPartitionsTask{
G
godchen 已提交
1120 1121 1122
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1123
		queryCoord:            node.queryCoord,
1124
		datacoord:             node.dataCoord,
1125 1126
	}

1127
	log := log.Ctx(ctx).With(
1128
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1129 1130 1131
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1132

1133 1134
	log.Debug(rpcReceived(method))

1135 1136 1137
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1138
			zap.Error(err))
1139

E
Enwei Jiao 已提交
1140
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1141
			metrics.AbandonLabel).Inc()
1142

1143
		return &commonpb.Status{
1144
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1145 1146 1147 1148
			Reason:    err.Error(),
		}, nil
	}

1149 1150 1151
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1152
		zap.Uint64("EndTS", lpt.EndTs()))
1153 1154 1155 1156

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1157
			zap.Error(err),
1158
			zap.Uint64("BeginTS", lpt.BeginTs()),
1159
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1160

E
Enwei Jiao 已提交
1161
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1162
			metrics.FailLabel).Inc()
1163

1164
		return &commonpb.Status{
1165
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1166 1167 1168 1169
			Reason:    err.Error(),
		}, nil
	}

1170 1171 1172
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1173
		zap.Uint64("EndTS", lpt.EndTs()))
1174

E
Enwei Jiao 已提交
1175
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1176
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1177
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1178
	return lpt.result, nil
1179 1180
}

1181
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1182
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1183 1184 1185
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1186 1187 1188 1189

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()

1190
	rpt := &releasePartitionsTask{
G
godchen 已提交
1191 1192 1193
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1194
		queryCoord:               node.queryCoord,
1195 1196
	}

1197
	method := "ReleasePartitions"
1198
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1199
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1200
		metrics.TotalLabel).Inc()
1201 1202

	log := log.Ctx(ctx).With(
1203
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1204 1205 1206
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1207

1208 1209
	log.Debug(rpcReceived(method))

1210 1211 1212
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1213
			zap.Error(err))
1214

E
Enwei Jiao 已提交
1215
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1216
			metrics.AbandonLabel).Inc()
1217

1218
		return &commonpb.Status{
1219
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1220 1221 1222 1223
			Reason:    err.Error(),
		}, nil
	}

1224 1225 1226
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1227
		zap.Uint64("EndTS", rpt.EndTs()))
1228 1229 1230 1231

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1232
			zap.Error(err),
1233
			zap.Uint64("BeginTS", rpt.BeginTs()),
1234
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1235

E
Enwei Jiao 已提交
1236
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1237
			metrics.FailLabel).Inc()
1238

1239
		return &commonpb.Status{
1240
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1241 1242 1243 1244
			Reason:    err.Error(),
		}, nil
	}

1245 1246 1247
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1248
		zap.Uint64("EndTS", rpt.EndTs()))
1249

E
Enwei Jiao 已提交
1250
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1251
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1252
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1253
	return rpt.result, nil
1254 1255
}

1256
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1257
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1258 1259 1260 1261 1262
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1263 1264 1265

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
1266 1267
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1268
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1269
		metrics.TotalLabel).Inc()
1270

1271
	g := &getPartitionStatisticsTask{
1272 1273 1274
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1275
		dataCoord:                     node.dataCoord,
1276 1277
	}

1278
	log := log.Ctx(ctx).With(
1279
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1280 1281 1282
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1283

1284 1285
	log.Debug(rpcReceived(method))

1286 1287 1288
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1289
			zap.Error(err))
1290

E
Enwei Jiao 已提交
1291
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1292
			metrics.AbandonLabel).Inc()
1293

1294 1295 1296 1297 1298 1299 1300 1301
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1302 1303 1304
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1305
		zap.Uint64("EndTS", g.EndTs()))
1306 1307 1308 1309

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1310
			zap.Error(err),
1311
			zap.Uint64("BeginTS", g.BeginTs()),
1312
			zap.Uint64("EndTS", g.EndTs()))
1313

E
Enwei Jiao 已提交
1314
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1315
			metrics.FailLabel).Inc()
1316

1317 1318 1319 1320 1321 1322 1323 1324
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1325 1326 1327
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1328
		zap.Uint64("EndTS", g.EndTs()))
1329

E
Enwei Jiao 已提交
1330
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1331
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1332
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1333
	return g.result, nil
1334 1335
}

1336
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1337
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1338 1339 1340 1341 1342
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1343 1344 1345 1346

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()

1347
	spt := &showPartitionsTask{
G
godchen 已提交
1348 1349 1350
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1351
		rootCoord:             node.rootCoord,
1352
		queryCoord:            node.queryCoord,
G
godchen 已提交
1353
		result:                nil,
1354 1355
	}

1356
	method := "ShowPartitions"
1357 1358
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1359
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1360
		metrics.TotalLabel).Inc()
1361

1362 1363
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1364 1365
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1366
		zap.Any("request", request))
1367 1368 1369 1370 1371 1372 1373

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1374
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1375
			metrics.AbandonLabel).Inc()
1376

G
godchen 已提交
1377
		return &milvuspb.ShowPartitionsResponse{
1378
			Status: &commonpb.Status{
1379
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1380 1381 1382 1383 1384
				Reason:    err.Error(),
			},
		}, nil
	}

1385 1386 1387 1388
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1389 1390
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1391 1392 1393 1394 1395
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1396
			zap.Error(err),
1397 1398 1399 1400 1401
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1402

E
Enwei Jiao 已提交
1403
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1404
			metrics.FailLabel).Inc()
1405

G
godchen 已提交
1406
		return &milvuspb.ShowPartitionsResponse{
1407
			Status: &commonpb.Status{
1408
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1409 1410 1411 1412
				Reason:    err.Error(),
			},
		}, nil
	}
1413 1414 1415 1416 1417 1418 1419 1420 1421

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1422
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1423
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1424
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1425 1426 1427
	return spt.result, nil
}

S
SimFG 已提交
1428 1429 1430 1431 1432 1433
func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
1434
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadingProgress")
S
SimFG 已提交
1435
	defer sp.Finish()
E
Enwei Jiao 已提交
1436
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1437 1438 1439
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1440 1441 1442 1443
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1444
		log.Warn("fail to get loading progress",
1445
			zap.String("collection_name", request.CollectionName),
S
SimFG 已提交
1446 1447
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
E
Enwei Jiao 已提交
1448
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1449 1450 1451 1452 1453
		if errors.Is(err, ErrInsufficientMemory) {
			return &milvuspb.GetLoadingProgressResponse{
				Status: InSufficientMemoryStatus(request.GetCollectionName()),
			}
		}
S
SimFG 已提交
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
S
SimFG 已提交
1468

1469 1470 1471
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1472
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1473
	)
S
SimFG 已提交
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
S
SimFG 已提交
1484
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
S
SimFG 已提交
1485 1486 1487
			return getErrResponse(err), nil
		}
	} else {
S
SimFG 已提交
1488 1489
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
S
SimFG 已提交
1490 1491 1492 1493
			return getErrResponse(err), nil
		}
	}

1494
	log.Debug(
S
SimFG 已提交
1495 1496
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1497 1498
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1499 1500 1501 1502 1503 1504 1505 1506
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1507
func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) {
S
SimFG 已提交
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539
	if !node.checkHealthy() {
		return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadState"
	tr := timerecord.NewTimeRecorder(method)
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadState")
	defer sp.Finish()
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
	log := log.Ctx(ctx)

	log.Debug(
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadStateResponse {
		log.Warn("fail to get load state",
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
		return &milvuspb.GetLoadStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}

1540 1541
	// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
	// 	is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
S
SimFG 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

	successResponse := &milvuspb.GetLoadStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	defer func() {
		log.Debug(
			rpcDone(method),
			zap.Any("request", request))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
		metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	}()

	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		successResponse.State = commonpb.LoadState_LoadStateNotExist
		return successResponse, nil
	}

	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
1583 1584 1585 1586 1587
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1588 1589 1590 1591 1592 1593
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	} else {
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
1594 1595 1596 1597 1598
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1599 1600 1601 1602 1603 1604 1605 1606 1607 1608
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	}
	if progress >= 100 {
		successResponse.State = commonpb.LoadState_LoadStateLoaded
	} else {
		successResponse.State = commonpb.LoadState_LoadStateLoading
	}
	return successResponse, nil
1609 1610
}

1611
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1612
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1613 1614 1615
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1616

1617
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateIndex")
D
dragondriver 已提交
1618 1619
	defer sp.Finish()

1620
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1621 1622 1623 1624
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
1625
		datacoord:  node.dataCoord,
1626
		queryCoord: node.queryCoord,
1627 1628
	}

D
dragondriver 已提交
1629
	method := "CreateIndex"
1630
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1631
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1632
		metrics.TotalLabel).Inc()
1633 1634

	log := log.Ctx(ctx).With(
1635
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1636 1637 1638 1639
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1640

1641 1642
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1643 1644 1645
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1646
			zap.Error(err))
D
dragondriver 已提交
1647

E
Enwei Jiao 已提交
1648
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1649
			metrics.AbandonLabel).Inc()
1650

1651
		return &commonpb.Status{
1652
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1653 1654 1655 1656
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1657 1658 1659
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1660
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1661 1662 1663 1664

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1665
			zap.Error(err),
D
dragondriver 已提交
1666
			zap.Uint64("BeginTs", cit.BeginTs()),
1667
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1668

E
Enwei Jiao 已提交
1669
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1670
			metrics.FailLabel).Inc()
1671

1672
		return &commonpb.Status{
1673
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1674 1675 1676 1677
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1678 1679 1680
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1681
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1682

E
Enwei Jiao 已提交
1683
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1684
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1685
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1686 1687 1688
	return cit.result, nil
}

1689
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1690
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1691 1692 1693 1694 1695
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1696 1697 1698 1699

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()

1700
	dit := &describeIndexTask{
S
sunby 已提交
1701
		ctx:                  ctx,
1702 1703
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1704
		datacoord:            node.dataCoord,
1705 1706
	}

1707 1708
	method := "DescribeIndex"
	// avoid data race
1709
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1710
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1711
		metrics.TotalLabel).Inc()
1712 1713

	log := log.Ctx(ctx).With(
1714
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1715 1716 1717
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1718 1719 1720
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1721 1722 1723 1724

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1725
			zap.Error(err))
1726

E
Enwei Jiao 已提交
1727
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1728
			metrics.AbandonLabel).Inc()
1729

1730 1731
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1732
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1733 1734 1735 1736 1737
				Reason:    err.Error(),
			},
		}, nil
	}

1738 1739 1740
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1741
		zap.Uint64("EndTs", dit.EndTs()))
1742 1743 1744 1745

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1746
			zap.Error(err),
1747
			zap.Uint64("BeginTs", dit.BeginTs()),
1748
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1749

Z
zhenshan.cao 已提交
1750 1751 1752 1753
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1754
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1755
			metrics.FailLabel).Inc()
1756

1757 1758
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1759
				ErrorCode: errCode,
1760 1761 1762 1763 1764
				Reason:    err.Error(),
			},
		}, nil
	}

1765 1766 1767
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1768
		zap.Uint64("EndTs", dit.EndTs()))
1769

E
Enwei Jiao 已提交
1770
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1771
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1772
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1773 1774 1775
	return dit.result, nil
}

1776
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1777
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1778 1779 1780
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1781 1782 1783 1784

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()

1785
	dit := &dropIndexTask{
S
sunby 已提交
1786
		ctx:              ctx,
B
BossZou 已提交
1787 1788
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1789
		dataCoord:        node.dataCoord,
1790
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1791
	}
G
godchen 已提交
1792

D
dragondriver 已提交
1793
	method := "DropIndex"
1794
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1795
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1796
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1797

1798
	log := log.Ctx(ctx).With(
1799
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1800 1801 1802 1803 1804
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1805 1806
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1807 1808 1809
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1810
			zap.Error(err))
E
Enwei Jiao 已提交
1811
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1812
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1813

B
BossZou 已提交
1814
		return &commonpb.Status{
1815
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1816 1817 1818
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1819

D
dragondriver 已提交
1820 1821 1822
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1823
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1824 1825 1826 1827

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1828
			zap.Error(err),
D
dragondriver 已提交
1829
			zap.Uint64("BeginTs", dit.BeginTs()),
1830
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1831

E
Enwei Jiao 已提交
1832
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1833
			metrics.FailLabel).Inc()
1834

B
BossZou 已提交
1835
		return &commonpb.Status{
1836
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1837 1838 1839
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1840 1841 1842 1843

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1844
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1845

E
Enwei Jiao 已提交
1846
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1847
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1848
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1849 1850 1851
	return dit.result, nil
}

1852 1853
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1854
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1855
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1856 1857 1858 1859 1860
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1861 1862 1863 1864

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()

1865
	gibpt := &getIndexBuildProgressTask{
1866 1867 1868
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1869
		rootCoord:                    node.rootCoord,
1870
		dataCoord:                    node.dataCoord,
1871 1872
	}

1873
	method := "GetIndexBuildProgress"
1874
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1875
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1876
		metrics.TotalLabel).Inc()
1877 1878

	log := log.Ctx(ctx).With(
1879
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1880 1881 1882 1883
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1884

1885 1886
	log.Debug(rpcReceived(method))

1887 1888 1889
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1890
			zap.Error(err))
E
Enwei Jiao 已提交
1891
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1892
			metrics.AbandonLabel).Inc()
1893

1894 1895 1896 1897 1898 1899 1900 1901
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1902 1903 1904
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1905
		zap.Uint64("EndTs", gibpt.EndTs()))
1906 1907 1908 1909

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1910
			zap.Error(err),
1911
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1912
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1913
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1914
			metrics.FailLabel).Inc()
1915 1916 1917 1918 1919 1920 1921 1922

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1923 1924 1925 1926

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1927
		zap.Uint64("EndTs", gibpt.EndTs()))
1928

E
Enwei Jiao 已提交
1929
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1930
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1931
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1932
	return gibpt.result, nil
1933 1934
}

1935
// GetIndexState get the build-state of index.
1936
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1937
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1938 1939 1940 1941 1942
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1943 1944 1945 1946

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()

1947
	dipt := &getIndexStateTask{
G
godchen 已提交
1948 1949 1950
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1951
		dataCoord:            node.dataCoord,
1952
		rootCoord:            node.rootCoord,
1953 1954
	}

1955
	method := "GetIndexState"
1956
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1957
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1958
		metrics.TotalLabel).Inc()
1959 1960

	log := log.Ctx(ctx).With(
1961
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1962 1963 1964 1965
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1966

1967 1968
	log.Debug(rpcReceived(method))

1969 1970 1971
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1972
			zap.Error(err))
1973

E
Enwei Jiao 已提交
1974
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1975
			metrics.AbandonLabel).Inc()
1976

G
godchen 已提交
1977
		return &milvuspb.GetIndexStateResponse{
1978
			Status: &commonpb.Status{
1979
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1980 1981 1982 1983 1984
				Reason:    err.Error(),
			},
		}, nil
	}

1985 1986 1987
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1988
		zap.Uint64("EndTs", dipt.EndTs()))
1989 1990 1991 1992

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1993
			zap.Error(err),
1994
			zap.Uint64("BeginTs", dipt.BeginTs()),
1995
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
1996
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1997
			metrics.FailLabel).Inc()
1998

G
godchen 已提交
1999
		return &milvuspb.GetIndexStateResponse{
2000
			Status: &commonpb.Status{
2001
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2002 2003 2004 2005 2006
				Reason:    err.Error(),
			},
		}, nil
	}

2007 2008 2009
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
2010
		zap.Uint64("EndTs", dipt.EndTs()))
2011

E
Enwei Jiao 已提交
2012
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2013
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2014
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2015 2016 2017
	return dipt.result, nil
}

2018
// Insert insert records into collection.
C
Cai Yudong 已提交
2019
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2020 2021
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
2022 2023 2024
	log := log.Ctx(ctx)
	log.Debug("Start processing insert request in Proxy")
	defer log.Debug("Finish processing insert request in Proxy")
X
Xiangyu Wang 已提交
2025

2026 2027 2028 2029 2030
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2031 2032
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
S
smellthemoon 已提交
2033
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(request)))
E
Enwei Jiao 已提交
2034
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
smellthemoon 已提交
2035

2036
	it := &insertTask{
2037 2038
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2039
		// req:       request,
2040
		insertMsg: &msgstream.InsertMsg{
2041 2042 2043
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2044
			InsertRequest: internalpb.InsertRequest{
2045 2046 2047
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2048
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2049
				),
2050 2051
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2052 2053 2054
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2055
				// RowData: transfer column based request to this
2056 2057
			},
		},
2058
		idAllocator:   node.rowIDAllocator,
2059 2060 2061
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2062
	}
2063

2064 2065
	if len(it.insertMsg.PartitionName) <= 0 {
		it.insertMsg.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
2066 2067
	}

X
Xiangyu Wang 已提交
2068
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2069
		numRows := request.NumRows
2070 2071 2072 2073
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2074

X
Xiangyu Wang 已提交
2075 2076 2077 2078 2079 2080 2081
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2082 2083
	}

X
Xiangyu Wang 已提交
2084
	log.Debug("Enqueue insert request in Proxy",
2085
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2086 2087 2088 2089 2090
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2091
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2092

X
Xiangyu Wang 已提交
2093
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2094
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2095
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2096
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2097
		return constructFailedResponse(err), nil
2098
	}
D
dragondriver 已提交
2099

X
Xiangyu Wang 已提交
2100
	log.Debug("Detail of insert request in Proxy",
2101
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2102 2103 2104 2105 2106
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2107
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2108 2109

	if err := it.WaitToFinish(); err != nil {
2110
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2111
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2112
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2113 2114 2115 2116 2117
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2118
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2130
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2131

S
smellthemoon 已提交
2132 2133 2134
	receiveSize := proto.Size(it.insertMsg)
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2135
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2136
		metrics.SuccessLabel).Inc()
2137
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2138 2139 2140
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2141 2142 2143
	return it.result, nil
}

2144
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2145
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2146 2147
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
2148 2149 2150
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2151

S
smellthemoon 已提交
2152
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(request)))
2153

G
groot 已提交
2154 2155 2156 2157 2158 2159
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2160 2161 2162
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2163
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2164
		metrics.TotalLabel).Inc()
2165
	dt := &deleteTask{
X
xige-16 已提交
2166 2167 2168
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
2169
		deleteMsg: &BaseDeleteTask{
G
godchen 已提交
2170 2171 2172
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2173
			DeleteRequest: internalpb.DeleteRequest{
2174 2175 2176 2177
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2178
				DbName:         request.DbName,
G
godchen 已提交
2179 2180 2181
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2182 2183 2184 2185
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2186 2187
	}

2188
	log.Debug("Enqueue delete request in Proxy",
2189
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2190 2191 2192 2193
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2194 2195 2196

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2197
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2198
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2199
			metrics.AbandonLabel).Inc()
2200

G
groot 已提交
2201 2202 2203 2204 2205 2206 2207 2208
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2209
	log.Debug("Detail of delete request in Proxy",
2210
		zap.String("role", typeutil.ProxyRole),
2211
		zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp),
G
groot 已提交
2212 2213 2214
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2215
		zap.String("expr", request.Expr))
G
groot 已提交
2216

2217
	if err := dt.WaitToFinish(); err != nil {
2218
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2219
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2220
			metrics.FailLabel).Inc()
G
groot 已提交
2221 2222 2223 2224 2225 2226 2227 2228
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

S
smellthemoon 已提交
2229 2230 2231
	receiveSize := proto.Size(dt.deleteMsg)
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2232
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2233
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2234 2235
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2236 2237 2238
	return dt.result, nil
}

S
smellthemoon 已提交
2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Upsert")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Uint32("NumRows", request.NumRows),
	)
	log.Debug("Start processing upsert request in Proxy")

	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
	method := "Upsert"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Add(float64(proto.Size(request)))
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()

	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),

		req: &milvuspb.UpsertRequest{
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType(commonpb.MsgType_Upsert)),
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
			),
			CollectionName: request.CollectionName,
			PartitionName:  request.PartitionName,
			FieldsData:     request.FieldsData,
			NumRows:        request.NumRows,
		},

		result: &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	if len(it.req.PartitionName) <= 0 {
		it.req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
	}

	constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
		numRows := request.NumRows
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}

		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: errCode,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
	}

	log.Debug("Enqueue upsert request in Proxy",
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)))

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Info("Failed to enqueue upsert task",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("Detail of upsert request in Proxy",
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()))

	if err := it.WaitToFinish(); err != nil {
		log.Info("Failed to execute insert task in task scheduler",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
		return constructFailedResponse(err, it.result.Status.ErrorCode), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
			numRows := request.NumRows
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}
		setErrorIndex()
	}

	insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg)
	deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg)

	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(deleteReceiveSize))
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(insertReceiveSize))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Debug("Finish processing upsert request in Proxy")
	return it.result, nil
}

2373
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2374
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2375
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2376
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2377 2378 2379

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2380 2381 2382 2383 2384
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2385 2386
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2387
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2388
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2389

C
cai.zhang 已提交
2390 2391
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2392

2393
	qt := &searchTask{
S
sunby 已提交
2394
		ctx:       ctx,
2395
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2396
		SearchRequest: &internalpb.SearchRequest{
2397 2398
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2399
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2400
			),
E
Enwei Jiao 已提交
2401
			ReqID: paramtable.GetNodeID(),
2402
		},
2403 2404 2405 2406
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2407 2408
	}

2409 2410 2411
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2412
	log := log.Ctx(ctx).With(
2413
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2414 2415 2416 2417 2418
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2419 2420 2421 2422
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2423

2424 2425 2426
	log.Debug(
		rpcReceived(method))

2427
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2428
		log.Warn(
2429
			rpcFailedToEnqueue(method),
2430
			zap.Error(err))
D
dragondriver 已提交
2431

E
Enwei Jiao 已提交
2432
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2433
			metrics.AbandonLabel).Inc()
2434

2435 2436
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2437
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2438 2439 2440 2441
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2442
	tr.CtxRecord(ctx, "search request enqueue")
2443

2444
	log.Debug(
2445
		rpcEnqueued(method),
2446
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2447

2448
	if err := qt.WaitToFinish(); err != nil {
2449
		log.Warn(
2450
			rpcFailedToWaitToFinish(method),
2451
			zap.Error(err))
2452

E
Enwei Jiao 已提交
2453
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2454
			metrics.FailLabel).Inc()
2455

2456 2457
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2458
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2459 2460 2461 2462 2463
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2464
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2465
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2466
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2467
	tr.CtxRecord(ctx, "wait search result")
2468
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2469

E
Enwei Jiao 已提交
2470
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2471
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2472
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2473
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2474
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2475
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2476
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2477
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2478 2479
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2480
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2481
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2482
	}
2483 2484 2485
	return qt.result, nil
}

2486
// Flush notify data nodes to persist the data of collection.
2487 2488 2489 2490 2491 2492 2493
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2494
	if !node.checkHealthy() {
2495 2496
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2497
	}
D
dragondriver 已提交
2498 2499 2500 2501

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()

2502
	ft := &flushTask{
T
ThreadDao 已提交
2503 2504 2505
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2506
		dataCoord:    node.dataCoord,
2507 2508
	}

D
dragondriver 已提交
2509
	method := "Flush"
2510
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2511
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2512

2513
	log := log.Ctx(ctx).With(
2514
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2515 2516
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2517

2518 2519
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2520 2521 2522
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2523
			zap.Error(err))
D
dragondriver 已提交
2524

E
Enwei Jiao 已提交
2525
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2526

2527 2528
		resp.Status.Reason = err.Error()
		return resp, nil
2529 2530
	}

D
dragondriver 已提交
2531 2532 2533
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2534
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2535 2536 2537 2538

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2539
			zap.Error(err),
D
dragondriver 已提交
2540
			zap.Uint64("BeginTs", ft.BeginTs()),
2541
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2542

E
Enwei Jiao 已提交
2543
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2544

D
dragondriver 已提交
2545
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2546 2547
		resp.Status.Reason = err.Error()
		return resp, nil
2548 2549
	}

D
dragondriver 已提交
2550 2551 2552
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2553
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2554

E
Enwei Jiao 已提交
2555 2556
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2557
	return ft.result, nil
2558 2559
}

2560
// Query get the records by primary keys.
C
Cai Yudong 已提交
2561
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2562
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2563
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2564 2565 2566

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2567 2568 2569 2570 2571
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2572

D
dragondriver 已提交
2573 2574
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
2575
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2576

2577
	qt := &queryTask{
2578 2579 2580
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2581 2582
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2583
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2584
			),
E
Enwei Jiao 已提交
2585
			ReqID: paramtable.GetNodeID(),
2586
		},
2587 2588
		request:          request,
		qc:               node.queryCoord,
2589
		queryShardPolicy: mergeRoundRobinPolicy,
2590
		shardMgr:         node.shardMgr,
2591 2592
	}

D
dragondriver 已提交
2593 2594
	method := "Query"

E
Enwei Jiao 已提交
2595
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2596 2597
		metrics.TotalLabel).Inc()

2598
	log := log.Ctx(ctx).With(
2599
		zap.String("role", typeutil.ProxyRole),
2600 2601
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2602 2603 2604 2605
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2606 2607 2608 2609
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2610

D
dragondriver 已提交
2611
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2612
		log.Warn(
D
dragondriver 已提交
2613
			rpcFailedToEnqueue(method),
2614
			zap.Error(err))
D
dragondriver 已提交
2615

E
Enwei Jiao 已提交
2616
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2617
			metrics.AbandonLabel).Inc()
2618

2619 2620 2621 2622 2623 2624
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2625
	}
Z
Zach 已提交
2626
	tr.CtxRecord(ctx, "query request enqueue")
2627

2628
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2629 2630

	if err := qt.WaitToFinish(); err != nil {
2631
		log.Warn(
D
dragondriver 已提交
2632
			rpcFailedToWaitToFinish(method),
2633
			zap.Error(err))
2634

E
Enwei Jiao 已提交
2635
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2636
			metrics.FailLabel).Inc()
2637

2638 2639 2640 2641 2642 2643 2644
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2645
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2646
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2647
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2648

2649
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2650

E
Enwei Jiao 已提交
2651
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2652 2653
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2654
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2655
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2656
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2657
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2658 2659

	ret := &milvuspb.QueryResults{
2660 2661
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2662 2663
	}
	sentSize := proto.Size(qt.result)
2664
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2665
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2666
	return ret, nil
2667
}
2668

2669
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2670 2671 2672 2673
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2674 2675 2676 2677

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()

Y
Yusup 已提交
2678 2679 2680 2681 2682 2683 2684
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2685
	method := "CreateAlias"
2686
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2687
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2688

2689
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2690 2691 2692 2693 2694
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2695 2696
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2697 2698 2699
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2700
			zap.Error(err))
D
dragondriver 已提交
2701

E
Enwei Jiao 已提交
2702
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2703

Y
Yusup 已提交
2704 2705 2706 2707 2708 2709
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2710 2711 2712
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2713
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2714 2715 2716 2717

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2718
			zap.Error(err),
D
dragondriver 已提交
2719
			zap.Uint64("BeginTs", cat.BeginTs()),
2720
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2721
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2722 2723 2724 2725 2726 2727 2728

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2729 2730 2731
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2732
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2733

E
Enwei Jiao 已提交
2734 2735
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2736 2737 2738
	return cat.result, nil
}

2739
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2740 2741 2742 2743
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2744 2745 2746 2747

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()

Y
Yusup 已提交
2748 2749 2750 2751 2752 2753 2754
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2755
	method := "DropAlias"
2756
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2757
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2758

2759
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2760 2761 2762 2763
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2764 2765
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2766 2767 2768
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2769
			zap.Error(err))
E
Enwei Jiao 已提交
2770
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2771

Y
Yusup 已提交
2772 2773 2774 2775 2776 2777
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2778 2779 2780
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2781
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2782 2783 2784 2785

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2786
			zap.Error(err),
D
dragondriver 已提交
2787
			zap.Uint64("BeginTs", dat.BeginTs()),
2788
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2789

E
Enwei Jiao 已提交
2790
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2791

Y
Yusup 已提交
2792 2793 2794 2795 2796 2797
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2798 2799 2800
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2801
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2802

E
Enwei Jiao 已提交
2803 2804
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2805 2806 2807
	return dat.result, nil
}

2808
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2809 2810 2811 2812
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2813 2814 2815 2816

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()

Y
Yusup 已提交
2817 2818 2819 2820 2821 2822 2823
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2824
	method := "AlterAlias"
2825
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2826
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2827

2828
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2829 2830 2831 2832 2833
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2834 2835
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2836 2837 2838
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2839
			zap.Error(err))
E
Enwei Jiao 已提交
2840
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2841

Y
Yusup 已提交
2842 2843 2844 2845 2846 2847
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2848 2849 2850
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2851
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2852 2853 2854 2855

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2856
			zap.Error(err),
D
dragondriver 已提交
2857
			zap.Uint64("BeginTs", aat.BeginTs()),
2858
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2859

E
Enwei Jiao 已提交
2860
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2861

Y
Yusup 已提交
2862 2863 2864 2865 2866 2867
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2868 2869 2870
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2871
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2872

E
Enwei Jiao 已提交
2873 2874
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2875 2876 2877
	return aat.result, nil
}

2878
// CalcDistance calculates the distances between vectors.
2879
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2880 2881 2882 2883 2884
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2885

2886 2887 2888 2889
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2890 2891
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2892

2893 2894 2895 2896 2897
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2898 2899
		}

2900
		qt := &queryTask{
2901 2902 2903
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2904 2905
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2906
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2907
				),
E
Enwei Jiao 已提交
2908
				ReqID: paramtable.GetNodeID(),
2909
			},
2910 2911 2912 2913
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2914
			queryShardPolicy: mergeRoundRobinPolicy,
2915
			shardMgr:         node.shardMgr,
2916 2917
		}

2918
		log := log.Ctx(ctx).With(
G
groot 已提交
2919 2920
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2921
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2922

2923
		err := node.sched.dqQueue.Enqueue(qt)
2924
		if err != nil {
2925 2926
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2927

2928 2929 2930 2931 2932
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2933
			}, err
2934
		}
2935

2936
		log.Debug("CalcDistance queryTask enqueued")
2937 2938 2939

		err = qt.WaitToFinish()
		if err != nil {
2940 2941
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2942 2943 2944 2945 2946 2947

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2948
			}, err
2949
		}
2950

2951
		log.Debug("CalcDistance queryTask Done")
2952 2953

		return &milvuspb.QueryResults{
2954 2955
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2956 2957 2958
		}, nil
	}

G
groot 已提交
2959 2960 2961 2962
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
2963 2964
	}

G
groot 已提交
2965
	return task.Execute(ctx, request)
2966 2967
}

2968
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2969
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2970 2971
	panic("implement me")
}
X
XuanYang-cn 已提交
2972

2973
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2974
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
2975 2976 2977 2978 2979
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
2980
	log.Debug("GetPersistentSegmentInfo",
2981
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2982 2983 2984
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2985
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2986
		Status: &commonpb.Status{
2987
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2988 2989
		},
	}
2990 2991 2992 2993
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
2994 2995
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2996
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2997
		metrics.TotalLabel).Inc()
2998 2999 3000

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
3001
	if err != nil {
E
Enwei Jiao 已提交
3002
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
3014
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3015
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3016 3017
		return resp, nil
	}
3018 3019

	// get Segment info
3020
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3021 3022 3023
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3024
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3025
		),
3026
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3027 3028
	})
	if err != nil {
E
Enwei Jiao 已提交
3029
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3030
			metrics.FailLabel).Inc()
3031 3032
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
3033
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3034 3035
		return resp, nil
	}
3036 3037 3038
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
3039
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3040
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3041
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3042 3043 3044 3045 3046 3047
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3048
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3049 3050
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3051
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3052 3053 3054
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3055
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3056
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3057
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3058
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3059 3060 3061 3062
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3063
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3064
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
3065 3066 3067 3068 3069
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.Finish()

	log := log.Ctx(ctx)

D
dragondriver 已提交
3070
	log.Debug("GetQuerySegmentInfo",
3071
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3072 3073 3074
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3075
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3076
		Status: &commonpb.Status{
3077
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3078 3079
		},
	}
3080 3081 3082 3083
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3084

3085 3086
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3087
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3088 3089
		metrics.TotalLabel).Inc()

3090 3091
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3092
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3093 3094 3095
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3096
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3097 3098 3099
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3100
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3101
		),
3102
		CollectionID: collID,
Z
zhenshan.cao 已提交
3103 3104
	})
	if err != nil {
E
Enwei Jiao 已提交
3105
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3106 3107
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
3108 3109 3110
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3111 3112 3113
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
3114
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3115
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3116 3117
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3131
			State:        info.SegmentState,
3132
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3133 3134
		}
	}
3135

E
Enwei Jiao 已提交
3136 3137
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3138
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3139 3140 3141 3142
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3143
// Dummy handles dummy request
C
Cai Yudong 已提交
3144
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3145 3146 3147 3148 3149 3150
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
3151 3152 3153 3154 3155 3156

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Dummy")
	defer sp.Finish()

	log := log.Ctx(ctx)

3157
	if err != nil {
3158 3159
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
3160 3161 3162
		return failedResponse, nil
	}

3163 3164
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3165
		if err != nil {
3166 3167
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
3168 3169 3170
			return failedResponse, nil
		}

3171
		request := &milvuspb.QueryRequest{
3172 3173 3174
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3175
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3176 3177
		}

3178
		_, err = node.Query(ctx, request)
3179
		if err != nil {
3180 3181
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
3182 3183
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3184 3185 3186 3187 3188 3189

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3190 3191
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3192 3193
}

J
jingkl 已提交
3194
// RegisterLink registers a link
C
Cai Yudong 已提交
3195
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3196
	code := node.stateCode.Load().(commonpb.StateCode)
3197 3198 3199 3200 3201

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RegisterLink")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3202
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3203
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3204

3205 3206
	log.Debug("RegisterLink")

3207
	if code != commonpb.StateCode_Healthy {
3208 3209 3210
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3211
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3212
				Reason:    "proxy not healthy",
3213 3214 3215
			},
		}, nil
	}
E
Enwei Jiao 已提交
3216
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3217 3218 3219
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3220
			ErrorCode: commonpb.ErrorCode_Success,
3221
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3222 3223 3224
		},
	}, nil
}
3225

3226
// GetMetrics gets the metrics of proxy
3227 3228
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3229 3230 3231 3232 3233
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx)

3234 3235
	log.RatedDebug(60, "Proxy.GetMetrics",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3236 3237 3238 3239
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3240
			zap.Int64("nodeID", paramtable.GetNodeID()),
3241
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3242
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3243 3244 3245 3246

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3247
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3248 3249 3250 3251 3252 3253 3254 3255
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3256
			zap.Int64("nodeID", paramtable.GetNodeID()),
3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

3269 3270 3271
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3272
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3273
	)
3274
	if metricType == metricsinfo.SystemInfoMetrics {
3275 3276 3277
		metrics, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err != nil {
			metrics, err = getSystemInfoMetrics(ctx, req, node)
3278
		}
3279

3280 3281
		log.RatedDebug(60, "Proxy.GetMetrics",
			zap.Int64("nodeID", paramtable.GetNodeID()),
3282
			zap.String("req", req.Request),
3283
			zap.String("metricType", metricType),
3284 3285 3286
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3287 3288
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3289
		return metrics, nil
3290 3291
	}

3292 3293
	log.RatedWarn(60, "Proxy.GetMetrics failed, request metric type is not implemented yet",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3294
		zap.String("req", req.Request),
3295
		zap.String("metricType", metricType))
3296 3297 3298 3299 3300 3301 3302 3303 3304 3305

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3306 3307 3308
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
3309 3310 3311 3312
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetProxyMetrics")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3313
		zap.Int64("nodeID", paramtable.GetNodeID()),
3314 3315
		zap.String("req", req.Request))

3316 3317
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3318
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3319 3320 3321 3322

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3323
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3341 3342 3343
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3344
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3345
	)
3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		log.Debug("Proxy.GetProxyMetrics",
3362
			zap.String("metricType", metricType))
3363 3364 3365 3366

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3367
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3368
		zap.String("metricType", metricType))
3369 3370 3371 3372 3373 3374 3375 3376 3377

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3378 3379
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
3380 3381 3382 3383 3384
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadBalance")
	defer sp.Finish()

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3385
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3386
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3387 3388 3389 3390 3391 3392 3393 3394 3395
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3396 3397 3398

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3399
		log.Warn("failed to get collection id",
3400 3401
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3402 3403 3404
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3405
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3406 3407 3408
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3409
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3410
		),
B
bigsheeper 已提交
3411 3412
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3413
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3414
		SealedSegmentIDs: req.SealedSegmentIDs,
3415
		CollectionID:     collectionID,
B
bigsheeper 已提交
3416 3417
	})
	if err != nil {
J
Jiquan Long 已提交
3418
		log.Warn("Failed to LoadBalance from Query Coordinator",
3419 3420
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3421 3422 3423 3424
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3425
		log.Warn("Failed to LoadBalance from Query Coordinator",
3426
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3427 3428 3429
		status.Reason = infoResp.Reason
		return status, nil
	}
3430 3431 3432
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3433 3434 3435 3436
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3437 3438
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
3439 3440 3441 3442 3443 3444 3445 3446
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetReplicas")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3447 3448 3449 3450 3451 3452
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3453 3454
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3455
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3456
	)
3457 3458 3459

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3460 3461
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3462 3463 3464 3465
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3466 3467 3468
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3469 3470 3471
	return resp, nil
}

3472
// GetCompactionState gets the compaction state of multiple segments
3473
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
3474 3475 3476 3477 3478 3479 3480
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionState")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3481 3482 3483 3484 3485 3486 3487
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3488 3489 3490
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3491 3492 3493
	return resp, err
}

3494
// ManualCompaction invokes compaction on specified collection
3495
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
3496 3497 3498 3499 3500 3501 3502
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ManualCompaction")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3503 3504 3505 3506 3507 3508 3509
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3510 3511 3512
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3513 3514 3515
	return resp, err
}

3516
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3517
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
3518 3519 3520 3521 3522 3523 3524
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3525 3526 3527 3528 3529 3530 3531
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3532 3533 3534
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3535 3536 3537
	return resp, err
}

B
Bingyi Sun 已提交
3538 3539
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
3540 3541 3542 3543 3544 3545 3546
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetFlushState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3547
	var err error
B
Bingyi Sun 已提交
3548 3549 3550
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3551
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3552 3553 3554
		return resp, nil
	}

3555
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3556
	if err != nil {
3557 3558
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3559 3560
		return nil, err
	}
3561 3562
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3563 3564 3565
	return resp, err
}

C
Cai Yudong 已提交
3566 3567
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3568 3569
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3570 3571
}

3572 3573 3574
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3575 3576
}

3577
// unhealthyStatus returns the proxy not healthy status
3578 3579 3580
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3581
		Reason:    "proxy not healthy",
3582 3583
	}
}
G
groot 已提交
3584 3585 3586

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3587 3588 3589 3590 3591
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Import")
	defer sp.Finish()

	log := log.Ctx(ctx)

3592 3593
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3594 3595
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3596 3597 3598 3599 3600 3601
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3602 3603 3604 3605
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3606

3607 3608
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3609 3610
		log.Error("failed to execute import request",
			zap.Error(err))
3611 3612 3613 3614 3615
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3616 3617
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3618
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3619 3620
		metrics.TotalLabel).Inc()

3621
	// Call rootCoord to finish import.
3622 3623
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3624
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3625 3626
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3627 3628 3629 3630
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3631

E
Enwei Jiao 已提交
3632 3633
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3634
	return respFromRC, nil
G
groot 已提交
3635 3636
}

3637
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3638
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
3639 3640 3641 3642 3643 3644 3645
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetImportState")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3646 3647 3648 3649 3650
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3651 3652
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3653
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3654
		metrics.TotalLabel).Inc()
G
groot 已提交
3655 3656

	resp, err := node.rootCoord.GetImportState(ctx, req)
3657
	if err != nil {
E
Enwei Jiao 已提交
3658
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3659 3660
		log.Error("failed to execute get import state",
			zap.Error(err))
3661 3662 3663 3664 3665
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3666 3667 3668
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3669 3670
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3671
	return resp, nil
G
groot 已提交
3672 3673 3674 3675
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
3676 3677 3678 3679 3680
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListImportTasks")
	defer sp.Finish()

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3681
	log.Debug("received list import tasks request")
G
groot 已提交
3682 3683 3684 3685 3686
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3687 3688
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3689
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3690
		metrics.TotalLabel).Inc()
G
groot 已提交
3691
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3692
	if err != nil {
E
Enwei Jiao 已提交
3693
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3694 3695
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3696 3697
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3698 3699 3700
		return resp, nil
	}

3701 3702 3703
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3704 3705
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3706 3707 3708
	return resp, err
}

3709 3710 3711
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3712 3713 3714 3715 3716

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3717 3718
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3719 3720

	log.Debug("received request to invalidate credential cache")
3721
	if !node.checkHealthy() {
3722
		return unhealthyStatus(), nil
3723
	}
3724 3725 3726 3727 3728

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3729
	log.Debug("complete to invalidate credential cache")
3730 3731 3732 3733 3734 3735 3736 3737 3738 3739

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
3740 3741 3742 3743 3744

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredentialCache")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
3745 3746
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3747 3748

	log.Debug("received request to update credential cache")
3749
	if !node.checkHealthy() {
3750
		return unhealthyStatus(), nil
3751
	}
3752 3753

	credInfo := &internalpb.CredentialInfo{
3754 3755
		Username:       request.Username,
		Sha256Password: request.Password,
3756 3757 3758 3759
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3760
	log.Debug("complete to update credential cache")
3761 3762 3763 3764 3765 3766 3767 3768

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
3769 3770 3771 3772 3773 3774 3775 3776
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3777
	if !node.checkHealthy() {
3778
		return unhealthyStatus(), nil
3779
	}
3780 3781 3782 3783 3784 3785 3786 3787 3788 3789
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3790 3791
		log.Error("decode password fail",
			zap.Error(err))
3792 3793 3794 3795 3796 3797
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3798 3799
		log.Error("illegal password",
			zap.Error(err))
3800 3801 3802 3803 3804 3805 3806
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3807 3808
		log.Error("encrypt password fail",
			zap.Error(err))
3809 3810 3811 3812 3813
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3814

3815 3816 3817
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3818
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3819 3820 3821
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3822 3823
		log.Error("create credential fail",
			zap.Error(err))
3824 3825 3826 3827 3828 3829 3830 3831
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3832
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
3833 3834 3835 3836 3837 3838 3839 3840
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-UpdateCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3841
	if !node.checkHealthy() {
3842
		return unhealthyStatus(), nil
3843
	}
C
codeman 已提交
3844 3845
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3846 3847
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3848 3849 3850 3851 3852 3853
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3854
	if err != nil {
3855 3856
		log.Error("decode password fail",
			zap.Error(err))
3857 3858 3859 3860 3861
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3862 3863
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3864 3865
		log.Error("illegal password",
			zap.Error(err))
3866 3867 3868 3869 3870
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3871 3872

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3873 3874 3875 3876 3877 3878 3879
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3880
	if err != nil {
3881 3882
		log.Error("encrypt password fail",
			zap.Error(err))
3883 3884 3885 3886 3887
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3888
	updateCredReq := &internalpb.CredentialInfo{
3889
		Username:          req.Username,
3890
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3891 3892
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3893
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3894
	if err != nil { // for error like conntext timeout etc.
3895 3896
		log.Error("update credential fail",
			zap.Error(err))
3897 3898 3899 3900 3901 3902 3903 3904 3905
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
3906 3907 3908 3909 3910 3911 3912 3913
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DeleteCredential")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3914
	if !node.checkHealthy() {
3915
		return unhealthyStatus(), nil
3916 3917
	}

3918 3919 3920 3921 3922 3923
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3924 3925
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3926 3927
		log.Error("delete credential fail",
			zap.Error(err))
3928 3929 3930 3931 3932 3933 3934 3935 3936
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
3937 3938 3939 3940 3941 3942 3943
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ListCredUsers")
	defer sp.Finish()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3944
	if !node.checkHealthy() {
3945
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3946
	}
3947
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3948 3949 3950
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3951 3952
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3965
		Usernames: resp.Usernames,
3966 3967
	}, nil
}
3968

3969
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
3970 3971 3972 3973 3974 3975 3976
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3977
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3978
		return errorutil.UnhealthyStatus(code), nil
3979 3980 3981 3982 3983 3984 3985 3986 3987 3988
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3989
		}, nil
3990 3991 3992 3993
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
3994 3995
		log.Error("fail to create role",
			zap.Error(err))
3996 3997 3998
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
3999
		}, nil
4000 4001
	}
	return result, nil
4002 4003
}

4004
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
4005 4006 4007 4008 4009 4010 4011
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
4012
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4013
		return errorutil.UnhealthyStatus(code), nil
4014 4015 4016 4017 4018
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4019
		}, nil
4020
	}
4021 4022 4023 4024 4025
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4026
		}, nil
4027
	}
4028 4029
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
4030 4031 4032
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
4033 4034 4035
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4036
		}, nil
4037 4038
	}
	return result, nil
4039 4040
}

4041
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
4042 4043 4044 4045 4046 4047 4048
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperateUserRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
4049
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4050
		return errorutil.UnhealthyStatus(code), nil
4051 4052 4053 4054 4055
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4056
		}, nil
4057 4058 4059 4060 4061
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4062
		}, nil
4063 4064 4065 4066
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
4067 4068
		logger.Error("fail to operate user role",
			zap.Error(err))
4069 4070 4071
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4072
		}, nil
4073 4074
	}
	return result, nil
4075 4076
}

4077
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
4078 4079 4080 4081 4082 4083
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectRole")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
4084
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4085
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4086 4087 4088 4089 4090 4091 4092 4093 4094
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4095
			}, nil
4096 4097 4098 4099 4100
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
4101 4102
		log.Error("fail to select role",
			zap.Error(err))
4103 4104 4105 4106 4107
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4108
		}, nil
4109 4110
	}
	return result, nil
4111 4112
}

4113
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
4114 4115 4116 4117 4118 4119 4120
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectUser")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
4121
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4122
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4123 4124 4125 4126 4127 4128 4129 4130 4131
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4132
			}, nil
4133 4134 4135 4136 4137
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
4138 4139
		log.Error("fail to select user",
			zap.Error(err))
4140 4141 4142 4143 4144
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4145
		}, nil
4146 4147
	}
	return result, nil
4148 4149
}

4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4180 4181
}

4182
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
4183 4184 4185 4186 4187 4188 4189
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-OperatePrivilege")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4190
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4191
		return errorutil.UnhealthyStatus(code), nil
4192 4193 4194 4195 4196
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4197
		}, nil
4198 4199 4200 4201 4202 4203
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4204
		}, nil
4205 4206 4207 4208
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4209 4210
		log.Error("fail to operate privilege",
			zap.Error(err))
4211 4212 4213
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4214
		}, nil
4215 4216
	}
	return result, nil
4217 4218
}

4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
4246 4247 4248 4249 4250 4251 4252
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-SelectGrant")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4253
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4254
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4255 4256 4257 4258 4259 4260 4261 4262
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4263
		}, nil
4264 4265 4266 4267
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4268 4269
		log.Error("fail to select grant",
			zap.Error(err))
4270 4271 4272 4273 4274
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4275
		}, nil
4276 4277 4278 4279 4280
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
4281 4282 4283 4284 4285 4286 4287
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.Finish()

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4288 4289 4290 4291 4292 4293 4294 4295 4296 4297
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4298 4299
			log.Error("fail to refresh policy info",
				zap.Error(err))
4300 4301 4302 4303 4304 4305
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4306
	log.Debug("RefreshPrivilegeInfoCache success")
4307 4308 4309 4310

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4311
}
4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
4329
	node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetCodes())
4330 4331 4332
	log.Info("current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Any("rates", request.GetRates()))
	if len(request.GetStates()) != 0 {
		for i := range request.GetStates() {
4333
			log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetCodes()[i].String()))
4334 4335
		}
	}
4336 4337 4338
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4339 4340 4341 4342

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4343 4344 4345 4346
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4347 4348 4349 4350 4351 4352 4353 4354 4355 4356
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

4357 4358 4359 4360 4361
		sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.Finish()

		log := log.Ctx(ctx).With(zap.String("role", role))

4362
		if err != nil {
4363 4364
			log.Warn("check health fail",
				zap.Error(err))
4365 4366 4367 4368 4369
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4370
			log.Warn("check health fail")
4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
4394 4395 4396
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
4397 4398 4399 4400 4401
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4402
	states, reasons := node.multiRateLimiter.GetQuotaStates()
4403 4404 4405 4406 4407
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
4408 4409 4410
		QuotaStates: states,
		Reasons:     reasons,
		IsHealthy:   true,
4411
	}, nil
4412
}