impl.go 155.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"fmt"
C
cai.zhang 已提交
22
	"os"
23
	"strconv"
24 25
	"sync"

26
	"github.com/cockroachdb/errors"
J
jaime 已提交
27
	"github.com/golang/protobuf/proto"
28
	"github.com/samber/lo"
E
Enwei Jiao 已提交
29
	"go.opentelemetry.io/otel"
J
jaime 已提交
30
	"go.uber.org/zap"
31
	"golang.org/x/sync/errgroup"
32

S
SimFG 已提交
33 34
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
35
	"github.com/milvus-io/milvus-proto/go-api/msgpb"
S
smellthemoon 已提交
36
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
37
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/log"
39
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
40
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
41 42 43 44
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
45
	"github.com/milvus-io/milvus/internal/util"
46
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
47
	"github.com/milvus-io/milvus/internal/util/crypto"
48
	"github.com/milvus-io/milvus/internal/util/errorutil"
49
	"github.com/milvus-io/milvus/internal/util/importutil"
50 51
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
52
	"github.com/milvus-io/milvus/internal/util/paramtable"
53
	"github.com/milvus-io/milvus/internal/util/ratelimitutil"
54
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
55
	"github.com/milvus-io/milvus/internal/util/typeutil"
56 57
)

58 59
const moduleName = "Proxy"

60
// UpdateStateCode updates the state code of Proxy.
61
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
62
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
63 64
}

65
// GetComponentStates get state of Proxy.
66 67
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
68 69 70 71
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
72
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
73 74 75 76 77 78
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
79
		return stats, nil
G
godchen 已提交
80
	}
81 82 83 84
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
85
	info := &milvuspb.ComponentInfo{
86 87
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
88
		Role:      typeutil.ProxyRole,
G
godchen 已提交
89 90 91 92 93 94
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
95
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
96
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
97 98 99 100 101 102 103 104 105
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

106
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
107
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
108 109 110
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
111
	ctx = logutil.WithModule(ctx, moduleName)
E
Enwei Jiao 已提交
112 113 114

	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.End()
115
	log := log.Ctx(ctx).With(
116
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
117
		zap.String("db", request.DbName),
118 119
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
120

121 122
	log.Info("received request to invalidate collection meta cache")

123
	collectionName := request.CollectionName
124
	collectionID := request.CollectionID
X
Xiaofan 已提交
125 126

	var aliasName []string
N
neza2017 已提交
127
	if globalMetaCache != nil {
128 129 130 131
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
132
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
133
		}
N
neza2017 已提交
134
	}
135 136
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
137 138
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
139
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
140
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
141
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
142
		}
143
	}
144
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
145

146
	return &commonpb.Status{
147
		ErrorCode: commonpb.ErrorCode_Success,
148 149
		Reason:    "",
	}, nil
150 151
}

152
// CreateCollection create a collection by the schema.
153
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
154
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
155 156 157
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
158

E
Enwei Jiao 已提交
159 160
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateCollection")
	defer sp.End()
161 162 163
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
164
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
165

166
	cct := &createCollectionTask{
S
sunby 已提交
167
		ctx:                     ctx,
168 169
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
170
		rootCoord:               node.rootCoord,
171 172
	}

173 174 175
	// avoid data race
	lenOfSchema := len(request.Schema)

176
	log := log.Ctx(ctx).With(
177
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
178 179
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
180
		zap.Int("len(schema)", lenOfSchema),
181 182
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
183

184 185
	log.Debug(rpcReceived(method))

186 187 188
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
189
			zap.Error(err))
190

E
Enwei Jiao 已提交
191
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
192
		return &commonpb.Status{
193
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
194 195 196 197
			Reason:    err.Error(),
		}, nil
	}

198 199
	log.Debug(
		rpcEnqueued(method),
200 201
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
202
		zap.Uint64("timestamp", request.Base.Timestamp))
203

204 205 206
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
207
			zap.Error(err),
208
			zap.Uint64("BeginTs", cct.BeginTs()),
209
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
210

E
Enwei Jiao 已提交
211
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
212
		return &commonpb.Status{
213
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
214 215 216 217
			Reason:    err.Error(),
		}, nil
	}

218 219
	log.Debug(
		rpcDone(method),
220
		zap.Uint64("BeginTs", cct.BeginTs()),
221
		zap.Uint64("EndTs", cct.EndTs()))
222

E
Enwei Jiao 已提交
223 224
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
225 226 227
	return cct.result, nil
}

228
// DropCollection drop a collection.
C
Cai Yudong 已提交
229
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
230 231 232
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
233

E
Enwei Jiao 已提交
234 235
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropCollection")
	defer sp.End()
236 237
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
238
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
239

240
	dct := &dropCollectionTask{
S
sunby 已提交
241
		ctx:                   ctx,
242 243
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
244
		rootCoord:             node.rootCoord,
245
		chMgr:                 node.chMgr,
S
sunby 已提交
246
		chTicker:              node.chTicker,
247 248
	}

249
	log := log.Ctx(ctx).With(
250
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
251 252
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
253

254 255
	log.Debug("DropCollection received")

256 257
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
258
			zap.Error(err))
259

E
Enwei Jiao 已提交
260
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
261
		return &commonpb.Status{
262
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
263 264 265 266
			Reason:    err.Error(),
		}, nil
	}

267 268
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
269
		zap.Uint64("EndTs", dct.EndTs()))
270 271 272

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
273
			zap.Error(err),
274
			zap.Uint64("BeginTs", dct.BeginTs()),
275
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
276

E
Enwei Jiao 已提交
277
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
278
		return &commonpb.Status{
279
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
280 281 282 283
			Reason:    err.Error(),
		}, nil
	}

284 285
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
286
		zap.Uint64("EndTs", dct.EndTs()))
287

E
Enwei Jiao 已提交
288 289
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
290 291 292
	return dct.result, nil
}

293
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
294
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
295 296 297 298 299
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
300

E
Enwei Jiao 已提交
301 302
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HasCollection")
	defer sp.End()
303 304
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
305
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
306
		metrics.TotalLabel).Inc()
307

308
	log := log.Ctx(ctx).With(
309
		zap.String("role", typeutil.ProxyRole),
310 311 312
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

313 314
	log.Debug("HasCollection received")

315
	hct := &hasCollectionTask{
S
sunby 已提交
316
		ctx:                  ctx,
317 318
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
319
		rootCoord:            node.rootCoord,
320 321
	}

322 323
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
324
			zap.Error(err))
325

E
Enwei Jiao 已提交
326
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
327
			metrics.AbandonLabel).Inc()
328 329
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
330
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
331 332 333 334 335
				Reason:    err.Error(),
			},
		}, nil
	}

336 337
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
338
		zap.Uint64("EndTS", hct.EndTs()))
339 340 341

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
342
			zap.Error(err),
343
			zap.Uint64("BeginTS", hct.BeginTs()),
344
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
345

E
Enwei Jiao 已提交
346
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
347
			metrics.FailLabel).Inc()
348 349
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
350
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
351 352 353 354 355
				Reason:    err.Error(),
			},
		}, nil
	}

356 357
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
358
		zap.Uint64("EndTS", hct.EndTs()))
359

E
Enwei Jiao 已提交
360
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
361
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
362
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
363 364 365
	return hct.result, nil
}

366
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
367
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
368 369 370
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
371

E
Enwei Jiao 已提交
372 373
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadCollection")
	defer sp.End()
374 375
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
376
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
377
		metrics.TotalLabel).Inc()
378
	lct := &loadCollectionTask{
S
sunby 已提交
379
		ctx:                   ctx,
380 381
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
382
		queryCoord:            node.queryCoord,
383
		datacoord:             node.dataCoord,
384 385
	}

386
	log := log.Ctx(ctx).With(
387
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
388
		zap.String("db", request.DbName),
389 390
		zap.String("collection", request.CollectionName),
		zap.Bool("refreshMode", request.Refresh))
391

392 393
	log.Debug("LoadCollection received")

394 395
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
396
			zap.Error(err))
397

E
Enwei Jiao 已提交
398
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
399
			metrics.AbandonLabel).Inc()
400
		return &commonpb.Status{
401
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
402 403 404
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
405

406 407
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
408
		zap.Uint64("EndTS", lct.EndTs()))
409 410 411

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
412
			zap.Error(err),
413
			zap.Uint64("BeginTS", lct.BeginTs()),
414
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
415
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
416
			metrics.FailLabel).Inc()
417
		return &commonpb.Status{
418
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
419 420 421 422
			Reason:    err.Error(),
		}, nil
	}

423 424
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
425
		zap.Uint64("EndTS", lct.EndTs()))
426

E
Enwei Jiao 已提交
427
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
428
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
429
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
430
	return lct.result, nil
431 432
}

433
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
434
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
435 436 437
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
438

E
Enwei Jiao 已提交
439 440
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleaseCollection")
	defer sp.End()
441 442
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
443
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
444
		metrics.TotalLabel).Inc()
445
	rct := &releaseCollectionTask{
S
sunby 已提交
446
		ctx:                      ctx,
447 448
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
449
		queryCoord:               node.queryCoord,
450
		chMgr:                    node.chMgr,
451 452
	}

453
	log := log.Ctx(ctx).With(
454
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
455 456
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
457

458 459
	log.Debug(rpcReceived(method))

460
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
461 462
		log.Warn(
			rpcFailedToEnqueue(method),
463
			zap.Error(err))
464

E
Enwei Jiao 已提交
465
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
466
			metrics.AbandonLabel).Inc()
467
		return &commonpb.Status{
468
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
469 470 471 472
			Reason:    err.Error(),
		}, nil
	}

473 474
	log.Debug(
		rpcEnqueued(method),
475
		zap.Uint64("BeginTS", rct.BeginTs()),
476
		zap.Uint64("EndTS", rct.EndTs()))
477 478

	if err := rct.WaitToFinish(); err != nil {
479 480
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
481
			zap.Error(err),
482
			zap.Uint64("BeginTS", rct.BeginTs()),
483
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
484

E
Enwei Jiao 已提交
485
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
486
			metrics.FailLabel).Inc()
487
		return &commonpb.Status{
488
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
489 490 491 492
			Reason:    err.Error(),
		}, nil
	}

493 494
	log.Debug(
		rpcDone(method),
495
		zap.Uint64("BeginTS", rct.BeginTs()),
496
		zap.Uint64("EndTS", rct.EndTs()))
497

E
Enwei Jiao 已提交
498
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
499
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
500
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
501
	return rct.result, nil
502 503
}

504
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
505
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
506 507 508 509 510
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
511

E
Enwei Jiao 已提交
512 513
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeCollection")
	defer sp.End()
514 515
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
516
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
517
		metrics.TotalLabel).Inc()
518

519
	dct := &describeCollectionTask{
S
sunby 已提交
520
		ctx:                       ctx,
521 522
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
523
		rootCoord:                 node.rootCoord,
524 525
	}

526
	log := log.Ctx(ctx).With(
527
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
528 529
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
530

531 532
	log.Debug("DescribeCollection received")

533 534
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
535
			zap.Error(err))
536

E
Enwei Jiao 已提交
537
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
538
			metrics.AbandonLabel).Inc()
539 540
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
541
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
542 543 544 545 546
				Reason:    err.Error(),
			},
		}, nil
	}

547 548
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
549
		zap.Uint64("EndTS", dct.EndTs()))
550 551 552

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
553
			zap.Error(err),
554
			zap.Uint64("BeginTS", dct.BeginTs()),
555
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
556

E
Enwei Jiao 已提交
557
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
558
			metrics.FailLabel).Inc()
559

560 561
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
562
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
563 564 565 566 567
				Reason:    err.Error(),
			},
		}, nil
	}

568 569
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
570
		zap.Uint64("EndTS", dct.EndTs()))
571

E
Enwei Jiao 已提交
572
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
573
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
574
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
575 576 577
	return dct.result, nil
}

578 579 580 581 582 583 584 585 586
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

E
Enwei Jiao 已提交
587 588
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics")
	defer sp.End()
589 590
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
591
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
592
		metrics.TotalLabel).Inc()
593 594 595 596 597 598 599 600 601 602
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

603
	log := log.Ctx(ctx).With(
604 605
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
606 607 608 609
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
610 611 612 613 614 615 616 617
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
618
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
643
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
644 645 646 647 648 649 650 651 652 653 654 655 656
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
657
		zap.Uint64("EndTS", g.EndTs()))
658

E
Enwei Jiao 已提交
659
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
660
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
661
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
662 663 664
	return g.result, nil
}

665
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
666
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
667 668 669 670 671
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
672

E
Enwei Jiao 已提交
673 674
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCollectionStatistics")
	defer sp.End()
675 676
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
677
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
678
		metrics.TotalLabel).Inc()
679
	g := &getCollectionStatisticsTask{
G
godchen 已提交
680 681 682
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
683
		dataCoord:                      node.dataCoord,
684 685
	}

686
	log := log.Ctx(ctx).With(
687
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
688 689
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
690

691 692
	log.Debug(rpcReceived(method))

693
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
694 695
		log.Warn(
			rpcFailedToEnqueue(method),
696
			zap.Error(err))
697

E
Enwei Jiao 已提交
698
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
699
			metrics.AbandonLabel).Inc()
700

G
godchen 已提交
701
		return &milvuspb.GetCollectionStatisticsResponse{
702
			Status: &commonpb.Status{
703
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
704 705 706 707 708
				Reason:    err.Error(),
			},
		}, nil
	}

709 710
	log.Debug(
		rpcEnqueued(method),
711
		zap.Uint64("BeginTS", g.BeginTs()),
712
		zap.Uint64("EndTS", g.EndTs()))
713 714

	if err := g.WaitToFinish(); err != nil {
715 716
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
717
			zap.Error(err),
718
			zap.Uint64("BeginTS", g.BeginTs()),
719
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
720

E
Enwei Jiao 已提交
721
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
722
			metrics.FailLabel).Inc()
723

G
godchen 已提交
724
		return &milvuspb.GetCollectionStatisticsResponse{
725
			Status: &commonpb.Status{
726
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
727 728 729 730 731
				Reason:    err.Error(),
			},
		}, nil
	}

732 733
	log.Debug(
		rpcDone(method),
734
		zap.Uint64("BeginTS", g.BeginTs()),
735
		zap.Uint64("EndTS", g.EndTs()))
736

E
Enwei Jiao 已提交
737
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
738
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
739
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
740
	return g.result, nil
741 742
}

743
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
744
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
745 746 747 748 749
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
E
Enwei Jiao 已提交
750 751
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ShowCollections")
	defer sp.End()
752 753
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
754
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
755

756
	sct := &showCollectionsTask{
G
godchen 已提交
757 758 759
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
760
		queryCoord:             node.queryCoord,
761
		rootCoord:              node.rootCoord,
762 763
	}

764
	log := log.Ctx(ctx).With(
765
		zap.String("role", typeutil.ProxyRole),
766 767
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
768 769 770 771
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
772

773
	err := node.sched.ddQueue.Enqueue(sct)
774
	if err != nil {
775 776
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
777
			zap.Any("CollectionNames", request.CollectionNames))
778

E
Enwei Jiao 已提交
779
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
780
		return &milvuspb.ShowCollectionsResponse{
781
			Status: &commonpb.Status{
782
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
783 784 785 786 787
				Reason:    err.Error(),
			},
		}, nil
	}

788
	log.Debug("ShowCollections enqueued",
789
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
790

791 792
	err = sct.WaitToFinish()
	if err != nil {
793 794
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
795
			zap.Any("CollectionNames", request.CollectionNames))
796

E
Enwei Jiao 已提交
797
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
798

G
godchen 已提交
799
		return &milvuspb.ShowCollectionsResponse{
800
			Status: &commonpb.Status{
801
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
802 803 804 805 806
				Reason:    err.Error(),
			},
		}, nil
	}

807
	log.Debug("ShowCollections Done",
808 809
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
810

E
Enwei Jiao 已提交
811 812
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
813 814 815
	return sct.result, nil
}

J
jaime 已提交
816 817 818 819 820
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

E
Enwei Jiao 已提交
821 822
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollection")
	defer sp.End()
J
jaime 已提交
823 824 825
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
826
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
827 828 829 830 831 832 833 834

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

835
	log := log.Ctx(ctx).With(
J
jaime 已提交
836 837 838 839
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

840 841 842
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
843 844 845
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
846
			zap.Error(err))
J
jaime 已提交
847

E
Enwei Jiao 已提交
848
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
849 850 851 852 853 854 855 856 857 858
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
859
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
860 861 862 863 864 865

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
866
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
867

E
Enwei Jiao 已提交
868
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
869 870 871 872 873 874 875 876 877
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
878
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
879

E
Enwei Jiao 已提交
880 881
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
882 883 884
	return act.result, nil
}

885
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
886
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
887 888 889
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
890

E
Enwei Jiao 已提交
891 892
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreatePartition")
	defer sp.End()
893 894
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
895
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
896

897
	cpt := &createPartitionTask{
S
sunby 已提交
898
		ctx:                    ctx,
899 900
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
901
		rootCoord:              node.rootCoord,
902 903 904
		result:                 nil,
	}

905
	log := log.Ctx(ctx).With(
906
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
907 908 909
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
910

911 912
	log.Debug(rpcReceived("CreatePartition"))

913 914 915
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
916
			zap.Error(err))
917

E
Enwei Jiao 已提交
918
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
919

920
		return &commonpb.Status{
921
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
922 923 924
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
925

926 927 928
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
929
		zap.Uint64("EndTS", cpt.EndTs()))
930 931 932 933

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
934
			zap.Error(err),
935
			zap.Uint64("BeginTS", cpt.BeginTs()),
936
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
937

E
Enwei Jiao 已提交
938
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
939

940
		return &commonpb.Status{
941
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
942 943 944
			Reason:    err.Error(),
		}, nil
	}
945 946 947 948

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
949
		zap.Uint64("EndTS", cpt.EndTs()))
950

E
Enwei Jiao 已提交
951 952
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
953 954 955
	return cpt.result, nil
}

956
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
957
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
958 959 960
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
961

E
Enwei Jiao 已提交
962 963
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropPartition")
	defer sp.End()
964 965
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
966
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
967

968
	dpt := &dropPartitionTask{
S
sunby 已提交
969
		ctx:                  ctx,
970 971
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
972
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
973
		queryCoord:           node.queryCoord,
974 975 976
		result:               nil,
	}

977
	log := log.Ctx(ctx).With(
978
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
979 980 981
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
982

983 984
	log.Debug(rpcReceived(method))

985 986 987
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
988
			zap.Error(err))
989

E
Enwei Jiao 已提交
990
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
991

992
		return &commonpb.Status{
993
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
994 995 996
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
997

998 999 1000
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1001
		zap.Uint64("EndTS", dpt.EndTs()))
1002 1003 1004 1005

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1006
			zap.Error(err),
1007
			zap.Uint64("BeginTS", dpt.BeginTs()),
1008
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1009

E
Enwei Jiao 已提交
1010
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1011

1012
		return &commonpb.Status{
1013
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1014 1015 1016
			Reason:    err.Error(),
		}, nil
	}
1017 1018 1019 1020

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1021
		zap.Uint64("EndTS", dpt.EndTs()))
1022

E
Enwei Jiao 已提交
1023 1024
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1025 1026 1027
	return dpt.result, nil
}

1028
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1029
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1030 1031 1032 1033 1034
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1035

E
Enwei Jiao 已提交
1036 1037
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HasPartition")
	defer sp.End()
1038 1039 1040
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1041
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1042
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1043

1044
	hpt := &hasPartitionTask{
S
sunby 已提交
1045
		ctx:                 ctx,
1046 1047
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1048
		rootCoord:           node.rootCoord,
1049 1050 1051
		result:              nil,
	}

1052
	log := log.Ctx(ctx).With(
1053
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1054 1055 1056
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1057

1058 1059
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1060 1061 1062
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1063
			zap.Error(err))
D
dragondriver 已提交
1064

E
Enwei Jiao 已提交
1065
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1066
			metrics.AbandonLabel).Inc()
1067

1068 1069
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1070
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1071 1072 1073 1074 1075
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1076

D
dragondriver 已提交
1077 1078 1079
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1080
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1081 1082 1083 1084

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1085
			zap.Error(err),
D
dragondriver 已提交
1086
			zap.Uint64("BeginTS", hpt.BeginTs()),
1087
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1088

E
Enwei Jiao 已提交
1089
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1090
			metrics.FailLabel).Inc()
1091

1092 1093
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1094
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1095 1096 1097 1098 1099
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1100 1101 1102 1103

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1104
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1105

E
Enwei Jiao 已提交
1106
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1107
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1108
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1109 1110 1111
	return hpt.result, nil
}

1112
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1113
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1114 1115 1116
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1117

E
Enwei Jiao 已提交
1118 1119
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadPartitions")
	defer sp.End()
1120 1121
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1122
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1123
		metrics.TotalLabel).Inc()
1124
	lpt := &loadPartitionsTask{
G
godchen 已提交
1125 1126 1127
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1128
		queryCoord:            node.queryCoord,
1129
		datacoord:             node.dataCoord,
1130 1131
	}

1132
	log := log.Ctx(ctx).With(
1133
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1134 1135
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
1136 1137
		zap.Strings("partitions", request.PartitionNames),
		zap.Bool("refreshMode", request.Refresh))
1138

1139 1140
	log.Debug(rpcReceived(method))

1141 1142 1143
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1144
			zap.Error(err))
1145

E
Enwei Jiao 已提交
1146
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1147
			metrics.AbandonLabel).Inc()
1148

1149
		return &commonpb.Status{
1150
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1151 1152 1153 1154
			Reason:    err.Error(),
		}, nil
	}

1155 1156 1157
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1158
		zap.Uint64("EndTS", lpt.EndTs()))
1159 1160 1161 1162

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1163
			zap.Error(err),
1164
			zap.Uint64("BeginTS", lpt.BeginTs()),
1165
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1166

E
Enwei Jiao 已提交
1167
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1168
			metrics.FailLabel).Inc()
1169

1170
		return &commonpb.Status{
1171
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1172 1173 1174 1175
			Reason:    err.Error(),
		}, nil
	}

1176 1177 1178
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1179
		zap.Uint64("EndTS", lpt.EndTs()))
1180

E
Enwei Jiao 已提交
1181
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1182
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1183
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1184
	return lpt.result, nil
1185 1186
}

1187
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1188
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1189 1190 1191
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1192

E
Enwei Jiao 已提交
1193 1194
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleasePartitions")
	defer sp.End()
1195

1196
	rpt := &releasePartitionsTask{
G
godchen 已提交
1197 1198 1199
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1200
		queryCoord:               node.queryCoord,
1201 1202
	}

1203
	method := "ReleasePartitions"
1204
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1205
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1206
		metrics.TotalLabel).Inc()
1207 1208

	log := log.Ctx(ctx).With(
1209
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1210 1211 1212
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1213

1214 1215
	log.Debug(rpcReceived(method))

1216 1217 1218
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1219
			zap.Error(err))
1220

E
Enwei Jiao 已提交
1221
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1222
			metrics.AbandonLabel).Inc()
1223

1224
		return &commonpb.Status{
1225
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1226 1227 1228 1229
			Reason:    err.Error(),
		}, nil
	}

1230 1231 1232
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1233
		zap.Uint64("EndTS", rpt.EndTs()))
1234 1235 1236 1237

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1238
			zap.Error(err),
1239
			zap.Uint64("BeginTS", rpt.BeginTs()),
1240
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1241

E
Enwei Jiao 已提交
1242
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1243
			metrics.FailLabel).Inc()
1244

1245
		return &commonpb.Status{
1246
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1247 1248 1249 1250
			Reason:    err.Error(),
		}, nil
	}

1251 1252 1253
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1254
		zap.Uint64("EndTS", rpt.EndTs()))
1255

E
Enwei Jiao 已提交
1256
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1257
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1258
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1259
	return rpt.result, nil
1260 1261
}

1262
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1263
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1264 1265 1266 1267 1268
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1269

E
Enwei Jiao 已提交
1270 1271
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetPartitionStatistics")
	defer sp.End()
1272 1273
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1274
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1275
		metrics.TotalLabel).Inc()
1276

1277
	g := &getPartitionStatisticsTask{
1278 1279 1280
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1281
		dataCoord:                     node.dataCoord,
1282 1283
	}

1284
	log := log.Ctx(ctx).With(
1285
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1286 1287 1288
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1289

1290 1291
	log.Debug(rpcReceived(method))

1292 1293 1294
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1295
			zap.Error(err))
1296

E
Enwei Jiao 已提交
1297
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1298
			metrics.AbandonLabel).Inc()
1299

1300 1301 1302 1303 1304 1305 1306 1307
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1308 1309 1310
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1311
		zap.Uint64("EndTS", g.EndTs()))
1312 1313 1314 1315

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1316
			zap.Error(err),
1317
			zap.Uint64("BeginTS", g.BeginTs()),
1318
			zap.Uint64("EndTS", g.EndTs()))
1319

E
Enwei Jiao 已提交
1320
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1321
			metrics.FailLabel).Inc()
1322

1323 1324 1325 1326 1327 1328 1329 1330
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1331 1332 1333
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1334
		zap.Uint64("EndTS", g.EndTs()))
1335

E
Enwei Jiao 已提交
1336
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1337
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1338
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1339
	return g.result, nil
1340 1341
}

1342
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1343
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1344 1345 1346 1347 1348
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1349

E
Enwei Jiao 已提交
1350 1351
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ShowPartitions")
	defer sp.End()
1352

1353
	spt := &showPartitionsTask{
G
godchen 已提交
1354 1355 1356
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1357
		rootCoord:             node.rootCoord,
1358
		queryCoord:            node.queryCoord,
G
godchen 已提交
1359
		result:                nil,
1360 1361
	}

1362
	method := "ShowPartitions"
1363 1364
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1365
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1366
		metrics.TotalLabel).Inc()
1367

1368 1369
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1370 1371
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1372
		zap.Any("request", request))
1373 1374 1375 1376 1377 1378 1379

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1380
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1381
			metrics.AbandonLabel).Inc()
1382

G
godchen 已提交
1383
		return &milvuspb.ShowPartitionsResponse{
1384
			Status: &commonpb.Status{
1385
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1386 1387 1388 1389 1390
				Reason:    err.Error(),
			},
		}, nil
	}

1391 1392 1393 1394
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1395 1396
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1397 1398 1399 1400 1401
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1402
			zap.Error(err),
1403 1404 1405 1406 1407
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1408

E
Enwei Jiao 已提交
1409
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1410
			metrics.FailLabel).Inc()
1411

G
godchen 已提交
1412
		return &milvuspb.ShowPartitionsResponse{
1413
			Status: &commonpb.Status{
1414
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1415 1416 1417 1418
				Reason:    err.Error(),
			},
		}, nil
	}
1419 1420 1421 1422 1423 1424 1425 1426 1427

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1428
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1429
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1430
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1431 1432 1433
	return spt.result, nil
}

S
SimFG 已提交
1434 1435 1436 1437 1438 1439
func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1440 1441
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadingProgress")
	defer sp.End()
E
Enwei Jiao 已提交
1442
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1443 1444 1445
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1446 1447 1448 1449
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1450
		log.Warn("fail to get loading progress",
1451
			zap.String("collection_name", request.CollectionName),
S
SimFG 已提交
1452 1453
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
E
Enwei Jiao 已提交
1454
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1455 1456 1457 1458 1459
		if errors.Is(err, ErrInsufficientMemory) {
			return &milvuspb.GetLoadingProgressResponse{
				Status: InSufficientMemoryStatus(request.GetCollectionName()),
			}
		}
S
SimFG 已提交
1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
S
SimFG 已提交
1474

1475 1476 1477
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1478
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1479
	)
S
SimFG 已提交
1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
S
SimFG 已提交
1490
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
S
SimFG 已提交
1491 1492 1493
			return getErrResponse(err), nil
		}
	} else {
S
SimFG 已提交
1494 1495
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
S
SimFG 已提交
1496 1497 1498 1499
			return getErrResponse(err), nil
		}
	}

1500
	log.Debug(
S
SimFG 已提交
1501 1502
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1503 1504
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1505 1506 1507 1508 1509 1510 1511 1512
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1513
func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) {
S
SimFG 已提交
1514 1515 1516 1517 1518
	if !node.checkHealthy() {
		return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1519 1520
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadState")
	defer sp.End()
S
SimFG 已提交
1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
	log := log.Ctx(ctx)

	log.Debug(
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadStateResponse {
		log.Warn("fail to get load state",
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
		return &milvuspb.GetLoadStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}

1546 1547
	// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
	// 	is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
S
SimFG 已提交
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

	successResponse := &milvuspb.GetLoadStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	defer func() {
		log.Debug(
			rpcDone(method),
			zap.Any("request", request))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
		metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	}()

	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		successResponse.State = commonpb.LoadState_LoadStateNotExist
		return successResponse, nil
	}

	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
1589 1590 1591 1592 1593
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1594 1595 1596 1597 1598 1599
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	} else {
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
1600 1601 1602 1603 1604
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	}
	if progress >= 100 {
		successResponse.State = commonpb.LoadState_LoadStateLoaded
	} else {
		successResponse.State = commonpb.LoadState_LoadStateLoading
	}
	return successResponse, nil
1615 1616
}

1617
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1618
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1619 1620 1621
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1622

E
Enwei Jiao 已提交
1623 1624
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateIndex")
	defer sp.End()
D
dragondriver 已提交
1625

1626
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1627 1628 1629 1630
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
1631
		datacoord:  node.dataCoord,
1632
		queryCoord: node.queryCoord,
1633 1634
	}

D
dragondriver 已提交
1635
	method := "CreateIndex"
1636
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1637
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1638
		metrics.TotalLabel).Inc()
1639 1640

	log := log.Ctx(ctx).With(
1641
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1642 1643 1644 1645
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1646

1647 1648
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1649 1650 1651
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1652
			zap.Error(err))
D
dragondriver 已提交
1653

E
Enwei Jiao 已提交
1654
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1655
			metrics.AbandonLabel).Inc()
1656

1657
		return &commonpb.Status{
1658
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1659 1660 1661 1662
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1663 1664 1665
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1666
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1667 1668 1669 1670

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1671
			zap.Error(err),
D
dragondriver 已提交
1672
			zap.Uint64("BeginTs", cit.BeginTs()),
1673
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1674

E
Enwei Jiao 已提交
1675
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1676
			metrics.FailLabel).Inc()
1677

1678
		return &commonpb.Status{
1679
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1680 1681 1682 1683
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1684 1685 1686
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1687
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1688

E
Enwei Jiao 已提交
1689
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1690
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1691
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1692 1693 1694
	return cit.result, nil
}

1695
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1696
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1697 1698 1699 1700 1701
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1702

E
Enwei Jiao 已提交
1703 1704
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeIndex")
	defer sp.End()
1705

1706
	dit := &describeIndexTask{
S
sunby 已提交
1707
		ctx:                  ctx,
1708 1709
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1710
		datacoord:            node.dataCoord,
1711 1712
	}

1713 1714
	method := "DescribeIndex"
	// avoid data race
1715
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1716
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1717
		metrics.TotalLabel).Inc()
1718 1719

	log := log.Ctx(ctx).With(
1720
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1721 1722 1723
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1724 1725 1726
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1727 1728 1729 1730

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1731
			zap.Error(err))
1732

E
Enwei Jiao 已提交
1733
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1734
			metrics.AbandonLabel).Inc()
1735

1736 1737
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1738
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1739 1740 1741 1742 1743
				Reason:    err.Error(),
			},
		}, nil
	}

1744 1745 1746
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1747
		zap.Uint64("EndTs", dit.EndTs()))
1748 1749 1750 1751

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1752
			zap.Error(err),
1753
			zap.Uint64("BeginTs", dit.BeginTs()),
1754
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1755

Z
zhenshan.cao 已提交
1756 1757 1758 1759
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1760
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1761
			metrics.FailLabel).Inc()
1762

1763 1764
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1765
				ErrorCode: errCode,
1766 1767 1768 1769 1770
				Reason:    err.Error(),
			},
		}, nil
	}

1771 1772 1773
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1774
		zap.Uint64("EndTs", dit.EndTs()))
1775

E
Enwei Jiao 已提交
1776
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1777
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1778
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1779 1780 1781
	return dit.result, nil
}

1782
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1783
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1784 1785 1786
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1787

E
Enwei Jiao 已提交
1788 1789
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropIndex")
	defer sp.End()
D
dragondriver 已提交
1790

1791
	dit := &dropIndexTask{
S
sunby 已提交
1792
		ctx:              ctx,
B
BossZou 已提交
1793 1794
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1795
		dataCoord:        node.dataCoord,
1796
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1797
	}
G
godchen 已提交
1798

D
dragondriver 已提交
1799
	method := "DropIndex"
1800
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1801
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1802
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1803

1804
	log := log.Ctx(ctx).With(
1805
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1806 1807 1808 1809 1810
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1811 1812
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1813 1814 1815
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1816
			zap.Error(err))
E
Enwei Jiao 已提交
1817
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1818
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1819

B
BossZou 已提交
1820
		return &commonpb.Status{
1821
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1822 1823 1824
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1825

D
dragondriver 已提交
1826 1827 1828
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1829
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1830 1831 1832 1833

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1834
			zap.Error(err),
D
dragondriver 已提交
1835
			zap.Uint64("BeginTs", dit.BeginTs()),
1836
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1837

E
Enwei Jiao 已提交
1838
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1839
			metrics.FailLabel).Inc()
1840

B
BossZou 已提交
1841
		return &commonpb.Status{
1842
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1843 1844 1845
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1846 1847 1848 1849

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1850
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1851

E
Enwei Jiao 已提交
1852
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1853
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1854
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1855 1856 1857
	return dit.result, nil
}

1858 1859
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1860
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1861
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1862 1863 1864 1865 1866
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1867

E
Enwei Jiao 已提交
1868 1869
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.End()
1870

1871
	gibpt := &getIndexBuildProgressTask{
1872 1873 1874
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1875
		rootCoord:                    node.rootCoord,
1876
		dataCoord:                    node.dataCoord,
1877 1878
	}

1879
	method := "GetIndexBuildProgress"
1880
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1881
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1882
		metrics.TotalLabel).Inc()
1883 1884

	log := log.Ctx(ctx).With(
1885
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1886 1887 1888 1889
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1890

1891 1892
	log.Debug(rpcReceived(method))

1893 1894 1895
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1896
			zap.Error(err))
E
Enwei Jiao 已提交
1897
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1898
			metrics.AbandonLabel).Inc()
1899

1900 1901 1902 1903 1904 1905 1906 1907
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1908 1909 1910
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1911
		zap.Uint64("EndTs", gibpt.EndTs()))
1912 1913 1914 1915

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1916
			zap.Error(err),
1917
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1918
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1919
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1920
			metrics.FailLabel).Inc()
1921 1922 1923 1924 1925 1926 1927 1928

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1929 1930 1931 1932

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1933
		zap.Uint64("EndTs", gibpt.EndTs()))
1934

E
Enwei Jiao 已提交
1935
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1936
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1937
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1938
	return gibpt.result, nil
1939 1940
}

1941
// GetIndexState get the build-state of index.
1942
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1943
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1944 1945 1946 1947 1948
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1949

E
Enwei Jiao 已提交
1950 1951
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert")
	defer sp.End()
1952

1953
	dipt := &getIndexStateTask{
G
godchen 已提交
1954 1955 1956
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1957
		dataCoord:            node.dataCoord,
1958
		rootCoord:            node.rootCoord,
1959 1960
	}

1961
	method := "GetIndexState"
1962
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1963
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1964
		metrics.TotalLabel).Inc()
1965 1966

	log := log.Ctx(ctx).With(
1967
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1968 1969 1970 1971
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1972

1973 1974
	log.Debug(rpcReceived(method))

1975 1976 1977
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1978
			zap.Error(err))
1979

E
Enwei Jiao 已提交
1980
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1981
			metrics.AbandonLabel).Inc()
1982

G
godchen 已提交
1983
		return &milvuspb.GetIndexStateResponse{
1984
			Status: &commonpb.Status{
1985
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1986 1987 1988 1989 1990
				Reason:    err.Error(),
			},
		}, nil
	}

1991 1992 1993
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1994
		zap.Uint64("EndTs", dipt.EndTs()))
1995 1996 1997 1998

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1999
			zap.Error(err),
2000
			zap.Uint64("BeginTs", dipt.BeginTs()),
2001
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
2002
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2003
			metrics.FailLabel).Inc()
2004

G
godchen 已提交
2005
		return &milvuspb.GetIndexStateResponse{
2006
			Status: &commonpb.Status{
2007
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2008 2009 2010 2011 2012
				Reason:    err.Error(),
			},
		}, nil
	}

2013 2014 2015
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
2016
		zap.Uint64("EndTs", dipt.EndTs()))
2017

E
Enwei Jiao 已提交
2018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2019
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2020
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2021 2022 2023
	return dipt.result, nil
}

2024
// Insert insert records into collection.
C
Cai Yudong 已提交
2025
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2026 2027
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert")
	defer sp.End()
X
Xiangyu Wang 已提交
2028

2029 2030 2031 2032 2033
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2034 2035
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
S
smellthemoon 已提交
2036
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(request)))
E
Enwei Jiao 已提交
2037
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
smellthemoon 已提交
2038

2039
	it := &insertTask{
2040 2041
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2042
		// req:       request,
2043
		insertMsg: &msgstream.InsertMsg{
2044 2045 2046
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
2047
			InsertRequest: msgpb.InsertRequest{
2048 2049 2050
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2051
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2052
				),
2053 2054
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2055 2056
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
2057
				Version:        msgpb.InsertDataVersion_ColumnBased,
2058
				// RowData: transfer column based request to this
2059 2060
			},
		},
2061
		idAllocator:   node.rowIDAllocator,
2062 2063 2064
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2065
	}
2066

2067 2068
	if len(it.insertMsg.PartitionName) <= 0 {
		it.insertMsg.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
2069 2070
	}

X
Xiangyu Wang 已提交
2071
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2072
		numRows := request.NumRows
2073 2074 2075 2076
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2077

X
Xiangyu Wang 已提交
2078 2079 2080 2081 2082 2083 2084
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2085 2086
	}

X
Xiangyu Wang 已提交
2087
	log.Debug("Enqueue insert request in Proxy",
2088
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2089 2090 2091 2092 2093
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2094
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2095

X
Xiangyu Wang 已提交
2096
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2097
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2098
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2099
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2100
		return constructFailedResponse(err), nil
2101
	}
D
dragondriver 已提交
2102

X
Xiangyu Wang 已提交
2103
	log.Debug("Detail of insert request in Proxy",
2104
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2105 2106 2107 2108 2109
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2110
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2111 2112

	if err := it.WaitToFinish(); err != nil {
2113
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2114
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2115
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2116 2117 2118 2119 2120
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2121
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2133
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2134

S
smellthemoon 已提交
2135 2136 2137
	receiveSize := proto.Size(it.insertMsg)
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2138
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2139
		metrics.SuccessLabel).Inc()
2140
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2141 2142 2143
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2144 2145 2146
	return it.result, nil
}

2147
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2148
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2149 2150
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete")
	defer sp.End()
2151 2152 2153
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2154

S
smellthemoon 已提交
2155
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(request)))
2156

G
groot 已提交
2157 2158 2159 2160 2161 2162
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2163 2164 2165
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2166
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2167
		metrics.TotalLabel).Inc()
2168
	dt := &deleteTask{
X
xige-16 已提交
2169 2170 2171
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
2172
		deleteMsg: &BaseDeleteTask{
G
godchen 已提交
2173 2174 2175
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
2176
			DeleteRequest: msgpb.DeleteRequest{
2177 2178 2179 2180
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2181
				DbName:         request.DbName,
G
godchen 已提交
2182 2183 2184
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2185 2186 2187 2188
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2189 2190
	}

2191
	log.Debug("Enqueue delete request in Proxy",
2192
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2193 2194 2195 2196
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2197 2198 2199

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2200
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2201
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2202
			metrics.AbandonLabel).Inc()
2203

G
groot 已提交
2204 2205 2206 2207 2208 2209 2210 2211
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2212
	log.Debug("Detail of delete request in Proxy",
2213
		zap.String("role", typeutil.ProxyRole),
2214
		zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp),
G
groot 已提交
2215 2216 2217
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2218
		zap.String("expr", request.Expr))
G
groot 已提交
2219

2220
	if err := dt.WaitToFinish(); err != nil {
2221
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2222
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2223
			metrics.FailLabel).Inc()
G
groot 已提交
2224 2225 2226 2227 2228 2229 2230 2231
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

S
smellthemoon 已提交
2232 2233 2234
	receiveSize := proto.Size(dt.deleteMsg)
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2235
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2236
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2237 2238
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2239 2240 2241
	return dt.result, nil
}

S
smellthemoon 已提交
2242 2243
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2244 2245
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert")
	defer sp.End()
S
smellthemoon 已提交
2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Uint32("NumRows", request.NumRows),
	)
	log.Debug("Start processing upsert request in Proxy")

	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
	method := "Upsert"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Add(float64(proto.Size(request)))
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()

	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),

		req: &milvuspb.UpsertRequest{
			Base: commonpbutil.NewMsgBase(
2276
				commonpbutil.WithMsgType(commonpb.MsgType_Upsert),
S
smellthemoon 已提交
2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
			),
			CollectionName: request.CollectionName,
			PartitionName:  request.PartitionName,
			FieldsData:     request.FieldsData,
			NumRows:        request.NumRows,
		},

		result: &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	if len(it.req.PartitionName) <= 0 {
		it.req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
	}

	constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
		numRows := request.NumRows
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}

		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: errCode,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
	}

	log.Debug("Enqueue upsert request in Proxy",
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)))

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Info("Failed to enqueue upsert task",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("Detail of upsert request in Proxy",
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()))

	if err := it.WaitToFinish(); err != nil {
		log.Info("Failed to execute insert task in task scheduler",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
S
smellthemoon 已提交
2346 2347 2348 2349 2350
		// Not every error case changes the status internally
		// change status there to handle it
		if it.result.Status.ErrorCode == commonpb.ErrorCode_Success {
			it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		}
S
smellthemoon 已提交
2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380
		return constructFailedResponse(err, it.result.Status.ErrorCode), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
			numRows := request.NumRows
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}
		setErrorIndex()
	}

	insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg)
	deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg)

	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(deleteReceiveSize))
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(insertReceiveSize))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Debug("Finish processing upsert request in Proxy")
	return it.result, nil
}

2381
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2382
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2383
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2384
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2385 2386 2387

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2388 2389 2390 2391 2392
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2393 2394
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2395
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2396
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2397

E
Enwei Jiao 已提交
2398 2399
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search")
	defer sp.End()
D
dragondriver 已提交
2400

2401
	qt := &searchTask{
S
sunby 已提交
2402
		ctx:       ctx,
2403
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2404
		SearchRequest: &internalpb.SearchRequest{
2405 2406
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2407
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2408
			),
E
Enwei Jiao 已提交
2409
			ReqID: paramtable.GetNodeID(),
2410
		},
2411 2412 2413 2414
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2415 2416
	}

2417 2418 2419
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2420
	log := log.Ctx(ctx).With(
2421
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2422 2423 2424 2425 2426
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2427 2428 2429 2430
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2431

2432 2433 2434
	log.Debug(
		rpcReceived(method))

2435
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2436
		log.Warn(
2437
			rpcFailedToEnqueue(method),
2438
			zap.Error(err))
D
dragondriver 已提交
2439

E
Enwei Jiao 已提交
2440
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2441
			metrics.AbandonLabel).Inc()
2442

2443 2444
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2445
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2446 2447 2448 2449
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2450
	tr.CtxRecord(ctx, "search request enqueue")
2451

2452
	log.Debug(
2453
		rpcEnqueued(method),
2454
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2455

2456
	if err := qt.WaitToFinish(); err != nil {
2457
		log.Warn(
2458
			rpcFailedToWaitToFinish(method),
2459
			zap.Error(err))
2460

E
Enwei Jiao 已提交
2461
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2462
			metrics.FailLabel).Inc()
2463

2464 2465
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2466
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2467 2468 2469 2470 2471
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2472
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2473
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2474
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2475
	tr.CtxRecord(ctx, "wait search result")
2476
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2477

E
Enwei Jiao 已提交
2478
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2479
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2480
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2481
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2482
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2483
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2484
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2485
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2486 2487
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2488
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2489
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2490
	}
2491 2492 2493
	return qt.result, nil
}

2494
// Flush notify data nodes to persist the data of collection.
2495 2496 2497 2498 2499 2500 2501
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2502
	if !node.checkHealthy() {
2503 2504
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2505
	}
D
dragondriver 已提交
2506

E
Enwei Jiao 已提交
2507 2508
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Flush")
	defer sp.End()
D
dragondriver 已提交
2509

2510
	ft := &flushTask{
T
ThreadDao 已提交
2511 2512 2513
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2514
		dataCoord:    node.dataCoord,
2515 2516
	}

D
dragondriver 已提交
2517
	method := "Flush"
2518
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2519
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2520

2521
	log := log.Ctx(ctx).With(
2522
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2523 2524
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2525

2526 2527
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2528 2529 2530
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2531
			zap.Error(err))
D
dragondriver 已提交
2532

E
Enwei Jiao 已提交
2533
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2534

2535 2536
		resp.Status.Reason = err.Error()
		return resp, nil
2537 2538
	}

D
dragondriver 已提交
2539 2540 2541
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2542
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2543 2544 2545 2546

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2547
			zap.Error(err),
D
dragondriver 已提交
2548
			zap.Uint64("BeginTs", ft.BeginTs()),
2549
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2550

E
Enwei Jiao 已提交
2551
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2552

D
dragondriver 已提交
2553
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2554 2555
		resp.Status.Reason = err.Error()
		return resp, nil
2556 2557
	}

D
dragondriver 已提交
2558 2559 2560
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2561
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2562

E
Enwei Jiao 已提交
2563 2564
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2565
	return ft.result, nil
2566 2567
}

2568
// Query get the records by primary keys.
C
Cai Yudong 已提交
2569
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2570
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2571
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2572 2573 2574

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2575 2576 2577 2578 2579
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2580

E
Enwei Jiao 已提交
2581 2582
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
	defer sp.End()
2583
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2584

2585
	qt := &queryTask{
2586 2587 2588
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2589 2590
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2591
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2592
			),
E
Enwei Jiao 已提交
2593
			ReqID: paramtable.GetNodeID(),
2594
		},
2595 2596
		request:          request,
		qc:               node.queryCoord,
2597
		queryShardPolicy: mergeRoundRobinPolicy,
2598
		shardMgr:         node.shardMgr,
2599 2600
	}

D
dragondriver 已提交
2601 2602
	method := "Query"

E
Enwei Jiao 已提交
2603
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2604 2605
		metrics.TotalLabel).Inc()

2606
	log := log.Ctx(ctx).With(
2607
		zap.String("role", typeutil.ProxyRole),
2608 2609
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2610 2611 2612 2613
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2614 2615 2616 2617
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2618

D
dragondriver 已提交
2619
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2620
		log.Warn(
D
dragondriver 已提交
2621
			rpcFailedToEnqueue(method),
2622
			zap.Error(err))
D
dragondriver 已提交
2623

E
Enwei Jiao 已提交
2624
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2625
			metrics.AbandonLabel).Inc()
2626

2627 2628 2629 2630 2631 2632
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2633
	}
Z
Zach 已提交
2634
	tr.CtxRecord(ctx, "query request enqueue")
2635

2636
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2637 2638

	if err := qt.WaitToFinish(); err != nil {
2639
		log.Warn(
D
dragondriver 已提交
2640
			rpcFailedToWaitToFinish(method),
2641
			zap.Error(err))
2642

E
Enwei Jiao 已提交
2643
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2644
			metrics.FailLabel).Inc()
2645

2646 2647 2648 2649 2650 2651 2652
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2653
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2654
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2655
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2656

2657
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2658

E
Enwei Jiao 已提交
2659
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2660 2661
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2662
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2663
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2664
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2665
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2666 2667

	ret := &milvuspb.QueryResults{
2668 2669
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2670 2671
	}
	sentSize := proto.Size(qt.result)
2672
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2673
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2674
	return ret, nil
2675
}
2676

2677
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2678 2679 2680 2681
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2682

E
Enwei Jiao 已提交
2683 2684
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateAlias")
	defer sp.End()
D
dragondriver 已提交
2685

Y
Yusup 已提交
2686 2687 2688 2689 2690 2691 2692
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2693
	method := "CreateAlias"
2694
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2695
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2696

2697
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2698 2699 2700 2701 2702
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2703 2704
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2705 2706 2707
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2708
			zap.Error(err))
D
dragondriver 已提交
2709

E
Enwei Jiao 已提交
2710
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2711

Y
Yusup 已提交
2712 2713 2714 2715 2716 2717
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2718 2719 2720
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2721
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2722 2723 2724 2725

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2726
			zap.Error(err),
D
dragondriver 已提交
2727
			zap.Uint64("BeginTs", cat.BeginTs()),
2728
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2729
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2730 2731 2732 2733 2734 2735 2736

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2737 2738 2739
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2740
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2741

E
Enwei Jiao 已提交
2742 2743
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2744 2745 2746
	return cat.result, nil
}

2747
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2748 2749 2750 2751
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2752

E
Enwei Jiao 已提交
2753 2754
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropAlias")
	defer sp.End()
D
dragondriver 已提交
2755

Y
Yusup 已提交
2756 2757 2758 2759 2760 2761 2762
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2763
	method := "DropAlias"
2764
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2765
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2766

2767
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2768 2769 2770 2771
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2772 2773
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2774 2775 2776
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2777
			zap.Error(err))
E
Enwei Jiao 已提交
2778
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2779

Y
Yusup 已提交
2780 2781 2782 2783 2784 2785
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2786 2787 2788
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2789
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2790 2791 2792 2793

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2794
			zap.Error(err),
D
dragondriver 已提交
2795
			zap.Uint64("BeginTs", dat.BeginTs()),
2796
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2797

E
Enwei Jiao 已提交
2798
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2799

Y
Yusup 已提交
2800 2801 2802 2803 2804 2805
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2806 2807 2808
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2809
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2810

E
Enwei Jiao 已提交
2811 2812
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2813 2814 2815
	return dat.result, nil
}

2816
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2817 2818 2819 2820
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2821

E
Enwei Jiao 已提交
2822 2823
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterAlias")
	defer sp.End()
D
dragondriver 已提交
2824

Y
Yusup 已提交
2825 2826 2827 2828 2829 2830 2831
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2832
	method := "AlterAlias"
2833
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2834
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2835

2836
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2837 2838 2839 2840 2841
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2842 2843
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2844 2845 2846
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2847
			zap.Error(err))
E
Enwei Jiao 已提交
2848
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2849

Y
Yusup 已提交
2850 2851 2852 2853 2854 2855
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2856 2857 2858
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2859
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2860 2861 2862 2863

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2864
			zap.Error(err),
D
dragondriver 已提交
2865
			zap.Uint64("BeginTs", aat.BeginTs()),
2866
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2867

E
Enwei Jiao 已提交
2868
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2869

Y
Yusup 已提交
2870 2871 2872 2873 2874 2875
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2876 2877 2878
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2879
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2880

E
Enwei Jiao 已提交
2881 2882
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2883 2884 2885
	return aat.result, nil
}

2886
// CalcDistance calculates the distances between vectors.
2887
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2888 2889 2890 2891 2892
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2893

E
Enwei Jiao 已提交
2894 2895
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CalcDistance")
	defer sp.End()
2896

2897 2898
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2899

2900 2901 2902 2903 2904
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2905 2906
		}

2907
		qt := &queryTask{
2908 2909 2910
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2911 2912
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2913
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2914
				),
E
Enwei Jiao 已提交
2915
				ReqID: paramtable.GetNodeID(),
2916
			},
2917 2918 2919 2920
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2921
			queryShardPolicy: mergeRoundRobinPolicy,
2922
			shardMgr:         node.shardMgr,
2923 2924
		}

2925
		log := log.Ctx(ctx).With(
G
groot 已提交
2926 2927
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2928
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2929

2930
		err := node.sched.dqQueue.Enqueue(qt)
2931
		if err != nil {
2932 2933
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2934

2935 2936 2937 2938 2939
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2940
			}, err
2941
		}
2942

2943
		log.Debug("CalcDistance queryTask enqueued")
2944 2945 2946

		err = qt.WaitToFinish()
		if err != nil {
2947 2948
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2949 2950 2951 2952 2953 2954

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2955
			}, err
2956
		}
2957

2958
		log.Debug("CalcDistance queryTask Done")
2959 2960

		return &milvuspb.QueryResults{
2961 2962
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2963 2964 2965
		}, nil
	}

G
groot 已提交
2966 2967
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
E
Enwei Jiao 已提交
2968
		traceID:   sp.SpanContext().TraceID().String(),
G
groot 已提交
2969
		queryFunc: query,
2970 2971
	}

G
groot 已提交
2972
	return task.Execute(ctx, request)
2973 2974
}

2975
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2976
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2977 2978
	panic("implement me")
}
X
XuanYang-cn 已提交
2979

2980
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2981
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
E
Enwei Jiao 已提交
2982 2983
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.End()
2984 2985 2986

	log := log.Ctx(ctx)

D
dragondriver 已提交
2987
	log.Debug("GetPersistentSegmentInfo",
2988
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2989 2990 2991
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2992
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2993
		Status: &commonpb.Status{
2994
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2995 2996
		},
	}
2997 2998 2999 3000
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3001 3002
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3003
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3004
		metrics.TotalLabel).Inc()
3005 3006 3007

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
3008
	if err != nil {
E
Enwei Jiao 已提交
3009
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
3021
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3022
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3023 3024
		return resp, nil
	}
3025 3026

	// get Segment info
3027
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3028 3029 3030
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3031
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3032
		),
3033
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3034 3035
	})
	if err != nil {
E
Enwei Jiao 已提交
3036
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3037
			metrics.FailLabel).Inc()
3038 3039
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
3040
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3041 3042
		return resp, nil
	}
3043 3044 3045
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
3046
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3047
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3048
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3049 3050 3051 3052 3053 3054
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3055
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3056 3057
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3058
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3059 3060 3061
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3062
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3063
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3064
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3065
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3066 3067 3068 3069
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3070
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3071
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
E
Enwei Jiao 已提交
3072 3073
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.End()
3074 3075 3076

	log := log.Ctx(ctx)

D
dragondriver 已提交
3077
	log.Debug("GetQuerySegmentInfo",
3078
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3079 3080 3081
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3082
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3083
		Status: &commonpb.Status{
3084
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3085 3086
		},
	}
3087 3088 3089 3090
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3091

3092 3093
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3094
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3095 3096
		metrics.TotalLabel).Inc()

3097 3098
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3099
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3100 3101 3102
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3103
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3104 3105 3106
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3107
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3108
		),
3109
		CollectionID: collID,
Z
zhenshan.cao 已提交
3110 3111
	})
	if err != nil {
E
Enwei Jiao 已提交
3112
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3113 3114
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
3115 3116 3117
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3118 3119 3120
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
3121
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3122
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3123 3124
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3138
			State:        info.SegmentState,
3139
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3140 3141
		}
	}
3142

E
Enwei Jiao 已提交
3143 3144
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3145
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3146 3147 3148 3149
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3150
// Dummy handles dummy request
C
Cai Yudong 已提交
3151
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3152 3153 3154 3155 3156 3157
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
3158

E
Enwei Jiao 已提交
3159 3160
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Dummy")
	defer sp.End()
3161 3162 3163

	log := log.Ctx(ctx)

3164
	if err != nil {
3165 3166
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
3167 3168 3169
		return failedResponse, nil
	}

3170 3171
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3172
		if err != nil {
3173 3174
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
3175 3176 3177
			return failedResponse, nil
		}

3178
		request := &milvuspb.QueryRequest{
3179 3180 3181
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3182
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3183 3184
		}

3185
		_, err = node.Query(ctx, request)
3186
		if err != nil {
3187 3188
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
3189 3190
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3191 3192 3193 3194 3195 3196

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3197 3198
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3199 3200
}

J
jingkl 已提交
3201
// RegisterLink registers a link
C
Cai Yudong 已提交
3202
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3203
	code := node.stateCode.Load().(commonpb.StateCode)
3204

E
Enwei Jiao 已提交
3205 3206
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RegisterLink")
	defer sp.End()
3207 3208

	log := log.Ctx(ctx).With(
3209
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3210
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3211

3212 3213
	log.Debug("RegisterLink")

3214
	if code != commonpb.StateCode_Healthy {
3215 3216 3217
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3218
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3219
				Reason:    "proxy not healthy",
3220 3221 3222
			},
		}, nil
	}
E
Enwei Jiao 已提交
3223
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3224 3225 3226
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3227
			ErrorCode: commonpb.ErrorCode_Success,
3228
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3229 3230 3231
		},
	}, nil
}
3232

3233
// GetMetrics gets the metrics of proxy
3234 3235
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
E
Enwei Jiao 已提交
3236 3237
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetMetrics")
	defer sp.End()
3238 3239 3240

	log := log.Ctx(ctx)

3241 3242
	log.RatedDebug(60, "Proxy.GetMetrics",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3243 3244 3245 3246
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3247
			zap.Int64("nodeID", paramtable.GetNodeID()),
3248
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3249
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3250 3251 3252 3253

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3254
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3255 3256 3257 3258 3259 3260 3261 3262
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3263
			zap.Int64("nodeID", paramtable.GetNodeID()),
3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

3276 3277 3278
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3279
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3280
	)
3281
	if metricType == metricsinfo.SystemInfoMetrics {
3282 3283 3284
		metrics, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err != nil {
			metrics, err = getSystemInfoMetrics(ctx, req, node)
3285
		}
3286

3287 3288
		log.RatedDebug(60, "Proxy.GetMetrics",
			zap.Int64("nodeID", paramtable.GetNodeID()),
3289
			zap.String("req", req.Request),
3290
			zap.String("metricType", metricType),
3291 3292 3293
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3294 3295
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3296
		return metrics, nil
3297 3298
	}

3299 3300
	log.RatedWarn(60, "Proxy.GetMetrics failed, request metric type is not implemented yet",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3301
		zap.String("req", req.Request),
3302
		zap.String("metricType", metricType))
3303 3304 3305 3306 3307 3308 3309 3310 3311 3312

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3313 3314 3315
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
E
Enwei Jiao 已提交
3316 3317
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetProxyMetrics")
	defer sp.End()
3318 3319

	log := log.Ctx(ctx).With(
3320
		zap.Int64("nodeID", paramtable.GetNodeID()),
3321 3322
		zap.String("req", req.Request))

3323 3324
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3325
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3326 3327 3328 3329

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3330
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3348 3349 3350
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3351
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3352
	)
3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3368 3369
		//log.Debug("Proxy.GetProxyMetrics",
		//	zap.String("metricType", metricType))
3370 3371 3372 3373

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3374
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3375
		zap.String("metricType", metricType))
3376 3377 3378 3379 3380 3381 3382 3383 3384

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3385 3386
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3387 3388
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadBalance")
	defer sp.End()
3389 3390 3391

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3392
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3393
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3394 3395 3396 3397 3398 3399 3400 3401 3402
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3403 3404 3405

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3406
		log.Warn("failed to get collection id",
3407 3408
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3409 3410 3411
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3412
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3413 3414 3415
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3416
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3417
		),
B
bigsheeper 已提交
3418 3419
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3420
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3421
		SealedSegmentIDs: req.SealedSegmentIDs,
3422
		CollectionID:     collectionID,
B
bigsheeper 已提交
3423 3424
	})
	if err != nil {
J
Jiquan Long 已提交
3425
		log.Warn("Failed to LoadBalance from Query Coordinator",
3426 3427
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3428 3429 3430 3431
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3432
		log.Warn("Failed to LoadBalance from Query Coordinator",
3433
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3434 3435 3436
		status.Reason = infoResp.Reason
		return status, nil
	}
3437 3438 3439
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3440 3441 3442 3443
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3444 3445
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
E
Enwei Jiao 已提交
3446 3447
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetReplicas")
	defer sp.End()
3448 3449 3450 3451 3452 3453

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3454 3455 3456 3457 3458 3459
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3460 3461
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3462
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3463
	)
3464

W
wei liu 已提交
3465 3466 3467 3468
	if req.GetCollectionName() != "" {
		req.CollectionID, _ = globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	}

3469 3470
	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3471 3472
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3473 3474 3475 3476
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3477 3478 3479
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3480 3481 3482
	return resp, nil
}

3483
// GetCompactionState gets the compaction state of multiple segments
3484
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
E
Enwei Jiao 已提交
3485 3486
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCompactionState")
	defer sp.End()
3487 3488 3489 3490 3491

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3492 3493 3494 3495 3496 3497 3498
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3499 3500 3501
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3502 3503 3504
	return resp, err
}

3505
// ManualCompaction invokes compaction on specified collection
3506
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
E
Enwei Jiao 已提交
3507 3508
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ManualCompaction")
	defer sp.End()
3509 3510 3511 3512 3513

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3514 3515 3516 3517 3518 3519 3520
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3521 3522 3523
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3524 3525 3526
	return resp, err
}

3527
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3528
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
E
Enwei Jiao 已提交
3529 3530
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.End()
3531 3532 3533 3534 3535

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3536 3537 3538 3539 3540 3541 3542
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3543 3544 3545
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3546 3547 3548
	return resp, err
}

B
Bingyi Sun 已提交
3549 3550
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
E
Enwei Jiao 已提交
3551 3552
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetFlushState")
	defer sp.End()
3553 3554 3555 3556 3557

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3558
	var err error
B
Bingyi Sun 已提交
3559 3560 3561
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3562
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3563 3564 3565
		return resp, nil
	}

3566
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3567
	if err != nil {
3568 3569
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3570 3571
		return nil, err
	}
3572 3573
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3574 3575 3576
	return resp, err
}

C
Cai Yudong 已提交
3577 3578
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3579 3580
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3581 3582
}

3583 3584 3585
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3586 3587
}

3588
// unhealthyStatus returns the proxy not healthy status
3589 3590 3591
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3592
		Reason:    "proxy not healthy",
3593 3594
	}
}
G
groot 已提交
3595 3596 3597

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
E
Enwei Jiao 已提交
3598 3599
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Import")
	defer sp.End()
3600 3601 3602

	log := log.Ctx(ctx)

3603 3604
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3605 3606
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3607 3608 3609 3610 3611 3612
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3613 3614 3615 3616
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3617

3618 3619
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3620 3621
		log.Error("failed to execute import request",
			zap.Error(err))
3622 3623 3624 3625 3626
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3627 3628
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3629
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3630 3631
		metrics.TotalLabel).Inc()

3632
	// Call rootCoord to finish import.
3633 3634
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3635
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3636 3637
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3638 3639 3640 3641
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3642

E
Enwei Jiao 已提交
3643 3644
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3645
	return respFromRC, nil
G
groot 已提交
3646 3647
}

3648
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3649
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
E
Enwei Jiao 已提交
3650 3651
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetImportState")
	defer sp.End()
3652 3653 3654 3655 3656

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3657 3658 3659 3660 3661
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3662 3663
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3664
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3665
		metrics.TotalLabel).Inc()
G
groot 已提交
3666 3667

	resp, err := node.rootCoord.GetImportState(ctx, req)
3668
	if err != nil {
E
Enwei Jiao 已提交
3669
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3670 3671
		log.Error("failed to execute get import state",
			zap.Error(err))
3672 3673 3674 3675 3676
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3677 3678 3679
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3680 3681
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3682
	return resp, nil
G
groot 已提交
3683 3684 3685 3686
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
E
Enwei Jiao 已提交
3687 3688
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListImportTasks")
	defer sp.End()
3689 3690 3691

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3692
	log.Debug("received list import tasks request")
G
groot 已提交
3693 3694 3695 3696 3697
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3698 3699
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3700
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3701
		metrics.TotalLabel).Inc()
G
groot 已提交
3702
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3703
	if err != nil {
E
Enwei Jiao 已提交
3704
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3705 3706
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3707 3708
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3709 3710 3711
		return resp, nil
	}

3712 3713 3714
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3715 3716
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3717 3718 3719
	return resp, err
}

3720 3721
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3722 3723
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.End()
3724 3725

	log := log.Ctx(ctx).With(
3726 3727
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3728 3729

	log.Debug("received request to invalidate credential cache")
3730
	if !node.checkHealthy() {
3731
		return unhealthyStatus(), nil
3732
	}
3733 3734 3735 3736 3737

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3738
	log.Debug("complete to invalidate credential cache")
3739 3740 3741 3742 3743 3744 3745 3746 3747

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3748 3749
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-UpdateCredentialCache")
	defer sp.End()
3750 3751

	log := log.Ctx(ctx).With(
3752 3753
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3754 3755

	log.Debug("received request to update credential cache")
3756
	if !node.checkHealthy() {
3757
		return unhealthyStatus(), nil
3758
	}
3759 3760

	credInfo := &internalpb.CredentialInfo{
3761 3762
		Username:       request.Username,
		Sha256Password: request.Password,
3763 3764 3765 3766
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3767
	log.Debug("complete to update credential cache")
3768 3769 3770 3771 3772 3773 3774 3775

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3776 3777
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateCredential")
	defer sp.End()
3778 3779 3780 3781 3782 3783

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3784
	if !node.checkHealthy() {
3785
		return unhealthyStatus(), nil
3786
	}
3787 3788 3789 3790 3791 3792 3793 3794 3795 3796
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3797 3798
		log.Error("decode password fail",
			zap.Error(err))
3799 3800 3801 3802 3803 3804
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3805 3806
		log.Error("illegal password",
			zap.Error(err))
3807 3808 3809 3810 3811 3812 3813
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3814 3815
		log.Error("encrypt password fail",
			zap.Error(err))
3816 3817 3818 3819 3820
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3821

3822 3823 3824
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3825
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3826 3827 3828
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3829 3830
		log.Error("create credential fail",
			zap.Error(err))
3831 3832 3833 3834 3835 3836 3837 3838
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3839
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3840 3841
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-UpdateCredential")
	defer sp.End()
3842 3843 3844 3845 3846 3847

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3848
	if !node.checkHealthy() {
3849
		return unhealthyStatus(), nil
3850
	}
C
codeman 已提交
3851 3852
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3853 3854
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3855 3856 3857 3858 3859 3860
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3861
	if err != nil {
3862 3863
		log.Error("decode password fail",
			zap.Error(err))
3864 3865 3866 3867 3868
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3869 3870
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3871 3872
		log.Error("illegal password",
			zap.Error(err))
3873 3874 3875 3876 3877
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3878 3879

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3880 3881 3882 3883 3884 3885 3886
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3887
	if err != nil {
3888 3889
		log.Error("encrypt password fail",
			zap.Error(err))
3890 3891 3892 3893 3894
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3895
	updateCredReq := &internalpb.CredentialInfo{
3896
		Username:          req.Username,
3897
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3898 3899
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3900
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3901
	if err != nil { // for error like conntext timeout etc.
3902 3903
		log.Error("update credential fail",
			zap.Error(err))
3904 3905 3906 3907 3908 3909 3910 3911 3912
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3913 3914
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DeleteCredential")
	defer sp.End()
3915 3916 3917 3918 3919 3920

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3921
	if !node.checkHealthy() {
3922
		return unhealthyStatus(), nil
3923 3924
	}

3925 3926 3927 3928 3929 3930
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3931 3932
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3933 3934
		log.Error("delete credential fail",
			zap.Error(err))
3935 3936 3937 3938 3939 3940 3941 3942 3943
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
E
Enwei Jiao 已提交
3944 3945
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListCredUsers")
	defer sp.End()
3946 3947 3948 3949 3950

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3951
	if !node.checkHealthy() {
3952
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3953
	}
3954
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3955 3956 3957
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3958 3959
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3972
		Usernames: resp.Usernames,
3973 3974
	}, nil
}
3975

3976
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3977 3978
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateRole")
	defer sp.End()
3979 3980 3981 3982 3983

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3984
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3985
		return errorutil.UnhealthyStatus(code), nil
3986 3987 3988 3989 3990 3991 3992 3993 3994 3995
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3996
		}, nil
3997 3998 3999 4000
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
4001 4002
		log.Error("fail to create role",
			zap.Error(err))
4003 4004 4005
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4006
		}, nil
4007 4008
	}
	return result, nil
4009 4010
}

4011
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4012 4013
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropRole")
	defer sp.End()
4014 4015 4016 4017 4018

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
4019
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4020
		return errorutil.UnhealthyStatus(code), nil
4021 4022 4023 4024 4025
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4026
		}, nil
4027
	}
4028 4029 4030 4031 4032
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4033
		}, nil
4034
	}
4035 4036
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
4037 4038 4039
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
4040 4041 4042
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4043
		}, nil
4044 4045
	}
	return result, nil
4046 4047
}

4048
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4049 4050
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-OperateUserRole")
	defer sp.End()
4051 4052 4053 4054 4055

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
4056
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4057
		return errorutil.UnhealthyStatus(code), nil
4058 4059 4060 4061 4062
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4063
		}, nil
4064 4065 4066 4067 4068
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4069
		}, nil
4070 4071 4072 4073
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
4074 4075
		logger.Error("fail to operate user role",
			zap.Error(err))
4076 4077 4078
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4079
		}, nil
4080 4081
	}
	return result, nil
4082 4083
}

4084
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
E
Enwei Jiao 已提交
4085 4086
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectRole")
	defer sp.End()
4087 4088 4089 4090

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
4091
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4092
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4093 4094 4095 4096 4097 4098 4099 4100 4101
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4102
			}, nil
4103 4104 4105 4106 4107
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
4108 4109
		log.Error("fail to select role",
			zap.Error(err))
4110 4111 4112 4113 4114
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4115
		}, nil
4116 4117
	}
	return result, nil
4118 4119
}

4120
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
E
Enwei Jiao 已提交
4121 4122
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectUser")
	defer sp.End()
4123 4124 4125 4126 4127

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
4128
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4129
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4130 4131 4132 4133 4134 4135 4136 4137 4138
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4139
			}, nil
4140 4141 4142 4143 4144
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
4145 4146
		log.Error("fail to select user",
			zap.Error(err))
4147 4148 4149 4150 4151
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4152
		}, nil
4153 4154
	}
	return result, nil
4155 4156
}

4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4187 4188
}

4189
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4190 4191
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-OperatePrivilege")
	defer sp.End()
4192 4193 4194 4195 4196

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4197
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4198
		return errorutil.UnhealthyStatus(code), nil
4199 4200 4201 4202 4203
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4204
		}, nil
4205 4206 4207 4208 4209 4210
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4211
		}, nil
4212 4213 4214 4215
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4216 4217
		log.Error("fail to operate privilege",
			zap.Error(err))
4218 4219 4220
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4221
		}, nil
4222 4223
	}
	return result, nil
4224 4225
}

4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
E
Enwei Jiao 已提交
4253 4254
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectGrant")
	defer sp.End()
4255 4256 4257 4258 4259

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4260
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4261
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4262 4263 4264 4265 4266 4267 4268 4269
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4270
		}, nil
4271 4272 4273 4274
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4275 4276
		log.Error("fail to select grant",
			zap.Error(err))
4277 4278 4279 4280 4281
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4282
		}, nil
4283 4284 4285 4286 4287
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4288 4289
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.End()
4290 4291 4292 4293 4294

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4295 4296 4297 4298 4299 4300 4301 4302 4303 4304
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4305 4306
			log.Error("fail to refresh policy info",
				zap.Error(err))
4307 4308 4309 4310 4311 4312
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4313
	log.Debug("RefreshPrivilegeInfoCache success")
4314 4315 4316 4317

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4318
}
4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
4336
	node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetCodes())
4337 4338 4339 4340 4341 4342 4343
	rateStrs := lo.FilterMap(request.GetRates(), func(r *internalpb.Rate, _ int) (string, bool) {
		return fmt.Sprintf("rateType:%s, rate:%s", r.GetRt().String(), ratelimitutil.Limit(r.GetR()).String()),
			ratelimitutil.Limit(r.GetR()) != ratelimitutil.Inf
	})
	if len(rateStrs) > 0 {
		log.RatedInfo(30, "current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Strings("rates", rateStrs))
	}
4344 4345
	if len(request.GetStates()) != 0 {
		for i := range request.GetStates() {
4346
			log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetCodes()[i].String()))
4347 4348
		}
	}
4349 4350 4351
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4352 4353 4354 4355

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4356 4357 4358 4359
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4360 4361 4362 4363 4364 4365 4366 4367 4368 4369
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

E
Enwei Jiao 已提交
4370 4371
		ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.End()
4372 4373 4374

		log := log.Ctx(ctx).With(zap.String("role", role))

4375
		if err != nil {
4376 4377
			log.Warn("check health fail",
				zap.Error(err))
4378 4379 4380 4381 4382
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4383
			log.Warn("check health fail")
4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
4407 4408 4409
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
4410 4411 4412 4413 4414
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4415
	states, reasons := node.multiRateLimiter.GetQuotaStates()
4416 4417 4418 4419 4420
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
4421 4422 4423
		QuotaStates: states,
		Reasons:     reasons,
		IsHealthy:   true,
4424
	}, nil
4425
}
W
wei liu 已提交
4426

J
jaime 已提交
4427
func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
4428
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RenameCollection")
J
jaime 已提交
4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447
	defer sp.End()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("oldName", req.GetOldName()),
		zap.String("newName", req.GetNewName()))

	log.Info("received rename collection request")
	var err error

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	if err := validateCollectionName(req.GetNewName()); err != nil {
		log.Warn("validate new collection name fail", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalCollectionName,
			Reason:    err.Error(),
J
jaime 已提交
4448
		}, nil
J
jaime 已提交
4449 4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467
	}

	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_RenameCollection),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	resp, err := node.rootCoord.RenameCollection(ctx, req)
	if err != nil {
		log.Warn("failed to rename collection", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, err
	}

	return resp, nil
}

W
wei liu 已提交
4468
func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4469 4470 4471 4472
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4473 4474 4475 4476 4477 4478 4479 4480
	method := "CreateResourceGroup"
	if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
		log.Warn("CreateResourceGroup failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &CreateResourceGroupTask{
		ctx:                        ctx,
		Condition:                  NewTaskCondition(ctx),
		CreateResourceGroupRequest: request,
		queryCoord:                 node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("CreateResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("CreateResourceGroup failed to enqueue",
			zap.Error(err))
W
wei liu 已提交
4502
		return getErrResponse(err, method), nil
W
wei liu 已提交
4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513
	}

	log.Debug("CreateResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("CreateResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4514
		return getErrResponse(err, method), nil
W
wei liu 已提交
4515 4516 4517 4518 4519 4520 4521 4522 4523 4524
	}

	log.Debug("CreateResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4525 4526
}

W
wei liu 已提交
4527 4528 4529 4530
func getErrResponse(err error, method string) *commonpb.Status {
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()

	return &commonpb.Status{
W
wei liu 已提交
4531
		ErrorCode: commonpb.ErrorCode_IllegalArgument,
W
wei liu 已提交
4532 4533 4534 4535
		Reason:    err.Error(),
	}
}

W
wei liu 已提交
4536
func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4537 4538 4539 4540
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4541
	method := "DropResourceGroup"
W
wei liu 已提交
4542 4543 4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &DropResourceGroupTask{
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		DropResourceGroupRequest: request,
		queryCoord:               node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("DropResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("DropResourceGroup failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4564
		return getErrResponse(err, method), nil
W
wei liu 已提交
4565 4566 4567 4568 4569 4570 4571 4572 4573 4574 4575
	}

	log.Debug("DropResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("DropResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4576
		return getErrResponse(err, method), nil
W
wei liu 已提交
4577 4578 4579 4580 4581 4582 4583 4584 4585 4586
	}

	log.Debug("DropResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4587 4588 4589
}

func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4590 4591 4592 4593
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4594 4595 4596 4597 4598 4599 4600 4601 4602 4603 4604 4605 4606 4607 4608
	method := "TransferNode"
	if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
		log.Warn("TransferNode failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

	if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
		log.Warn("TransferNode failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4609 4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629 4630
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferNode")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &TransferNodeTask{
		ctx:                 ctx,
		Condition:           NewTaskCondition(ctx),
		TransferNodeRequest: request,
		queryCoord:          node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("TransferNode received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("TransferNode failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4631
		return getErrResponse(err, method), nil
W
wei liu 已提交
4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642
	}

	log.Debug("TransferNode enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("TransferNode failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4643
		return getErrResponse(err, method), nil
W
wei liu 已提交
4644 4645 4646 4647 4648 4649 4650 4651 4652 4653
	}

	log.Debug("TransferNode done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4654 4655 4656
}

func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.TransferReplicaRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4657 4658 4659
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
W
wei liu 已提交
4660

W
wei liu 已提交
4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675
	method := "TransferReplica"
	if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
		log.Warn("TransferReplica failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

	if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
		log.Warn("TransferReplica failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4676 4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferReplica")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &TransferReplicaTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		TransferReplicaRequest: request,
		queryCoord:             node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("TransferReplica received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("TransferReplica failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4698
		return getErrResponse(err, method), nil
W
wei liu 已提交
4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709
	}

	log.Debug("TransferReplica enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("TransferReplica failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4710
		return getErrResponse(err, method), nil
W
wei liu 已提交
4711 4712 4713 4714 4715 4716 4717 4718 4719 4720
	}

	log.Debug("TransferReplica done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4721 4722
}

W
wei liu 已提交
4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789
func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.ListResourceGroupsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListResourceGroups")
	defer sp.End()
	method := "ListResourceGroups"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &ListResourceGroupsTask{
		ctx:                       ctx,
		Condition:                 NewTaskCondition(ctx),
		ListResourceGroupsRequest: request,
		queryCoord:                node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("ListResourceGroups received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("ListResourceGroups failed to enqueue",
			zap.Error(err))

		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.ListResourceGroupsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("ListResourceGroups enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("ListResourceGroups failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
		return &milvuspb.ListResourceGroupsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("ListResourceGroups done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4790 4791 4792
}

func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.DescribeResourceGroupRequest) (*milvuspb.DescribeResourceGroupResponse, error) {
W
wei liu 已提交
4793 4794 4795 4796 4797 4798
	if !node.checkHealthy() {
		return &milvuspb.DescribeResourceGroupResponse{
			Status: unhealthyStatus(),
		}, nil
	}

W
wei liu 已提交
4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810
	method := "DescribeResourceGroup"
	GetErrResponse := func(err error) *milvuspb.DescribeResourceGroupResponse {
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()

		return &milvuspb.DescribeResourceGroupResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

W
wei liu 已提交
4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &DescribeResourceGroupTask{
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		DescribeResourceGroupRequest: request,
		queryCoord:                   node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("DescribeResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("DescribeResourceGroup failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4833
		return GetErrResponse(err), nil
W
wei liu 已提交
4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844
	}

	log.Debug("DescribeResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("DescribeResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4845
		return GetErrResponse(err), nil
W
wei liu 已提交
4846 4847 4848 4849 4850 4851 4852 4853 4854 4855
	}

	log.Debug("DescribeResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4856
}