impl.go 155.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"fmt"
C
cai.zhang 已提交
22
	"os"
23
	"strconv"
24 25
	"sync"

26
	"github.com/cockroachdb/errors"
J
jaime 已提交
27
	"github.com/golang/protobuf/proto"
28
	"github.com/samber/lo"
E
Enwei Jiao 已提交
29
	"go.opentelemetry.io/otel"
J
jaime 已提交
30
	"go.uber.org/zap"
31
	"golang.org/x/sync/errgroup"
32

S
SimFG 已提交
33 34
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
35
	"github.com/milvus-io/milvus-proto/go-api/msgpb"
S
smellthemoon 已提交
36
	"github.com/milvus-io/milvus-proto/go-api/schemapb"
37
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/log"
39
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
40
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
41 42 43 44
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
45
	"github.com/milvus-io/milvus/internal/util"
46
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
47
	"github.com/milvus-io/milvus/internal/util/crypto"
48
	"github.com/milvus-io/milvus/internal/util/errorutil"
49
	"github.com/milvus-io/milvus/internal/util/importutil"
50 51
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
52
	"github.com/milvus-io/milvus/internal/util/paramtable"
53
	"github.com/milvus-io/milvus/internal/util/ratelimitutil"
54
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
55
	"github.com/milvus-io/milvus/internal/util/typeutil"
56 57
)

58 59
const moduleName = "Proxy"

60
// UpdateStateCode updates the state code of Proxy.
61
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
62
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
63 64
}

65
// GetComponentStates get state of Proxy.
66 67
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
68 69 70 71
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
72
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
73 74 75 76 77 78
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
79
		return stats, nil
G
godchen 已提交
80
	}
81 82 83 84
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
85
	info := &milvuspb.ComponentInfo{
86 87
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
88
		Role:      typeutil.ProxyRole,
G
godchen 已提交
89 90 91 92 93 94
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
95
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
96
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
97 98 99 100 101 102 103 104 105
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

106
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
107
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
108 109 110
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
111
	ctx = logutil.WithModule(ctx, moduleName)
E
Enwei Jiao 已提交
112 113 114

	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-InvalidateCollectionMetaCache")
	defer sp.End()
115
	log := log.Ctx(ctx).With(
116
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
117
		zap.String("db", request.DbName),
118 119
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
120

121 122
	log.Info("received request to invalidate collection meta cache")

123
	collectionName := request.CollectionName
124
	collectionID := request.CollectionID
X
Xiaofan 已提交
125 126

	var aliasName []string
N
neza2017 已提交
127
	if globalMetaCache != nil {
128 129 130 131
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
132
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
133
		}
N
neza2017 已提交
134
	}
135 136
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
137 138
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
139
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
140
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
141
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
142
		}
143
	}
144
	log.Info("complete to invalidate collection meta cache")
D
dragondriver 已提交
145

146
	return &commonpb.Status{
147
		ErrorCode: commonpb.ErrorCode_Success,
148 149
		Reason:    "",
	}, nil
150 151
}

152
// CreateCollection create a collection by the schema.
153
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
154
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
155 156 157
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
158

E
Enwei Jiao 已提交
159 160
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateCollection")
	defer sp.End()
161 162 163
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
164
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
165

166
	cct := &createCollectionTask{
S
sunby 已提交
167
		ctx:                     ctx,
168 169
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
170
		rootCoord:               node.rootCoord,
171 172
	}

173 174 175
	// avoid data race
	lenOfSchema := len(request.Schema)

176
	log := log.Ctx(ctx).With(
177
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
178 179
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
180
		zap.Int("len(schema)", lenOfSchema),
181 182
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
183

184 185
	log.Debug(rpcReceived(method))

186 187 188
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
189
			zap.Error(err))
190

E
Enwei Jiao 已提交
191
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
192
		return &commonpb.Status{
193
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
194 195 196 197
			Reason:    err.Error(),
		}, nil
	}

198 199
	log.Debug(
		rpcEnqueued(method),
200 201
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
202
		zap.Uint64("timestamp", request.Base.Timestamp))
203

204 205 206
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
207
			zap.Error(err),
208
			zap.Uint64("BeginTs", cct.BeginTs()),
209
			zap.Uint64("EndTs", cct.EndTs()))
D
dragondriver 已提交
210

E
Enwei Jiao 已提交
211
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
212
		return &commonpb.Status{
213
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
214 215 216 217
			Reason:    err.Error(),
		}, nil
	}

218 219
	log.Debug(
		rpcDone(method),
220
		zap.Uint64("BeginTs", cct.BeginTs()),
221
		zap.Uint64("EndTs", cct.EndTs()))
222

E
Enwei Jiao 已提交
223 224
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
225 226 227
	return cct.result, nil
}

228
// DropCollection drop a collection.
C
Cai Yudong 已提交
229
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
230 231 232
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
233

E
Enwei Jiao 已提交
234 235
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropCollection")
	defer sp.End()
236 237
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
238
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
239

240
	dct := &dropCollectionTask{
S
sunby 已提交
241
		ctx:                   ctx,
242 243
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
244
		rootCoord:             node.rootCoord,
245
		chMgr:                 node.chMgr,
S
sunby 已提交
246
		chTicker:              node.chTicker,
247 248
	}

249
	log := log.Ctx(ctx).With(
250
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
251 252
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
253

254 255
	log.Debug("DropCollection received")

256 257
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
258
			zap.Error(err))
259

E
Enwei Jiao 已提交
260
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
261
		return &commonpb.Status{
262
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
263 264 265 266
			Reason:    err.Error(),
		}, nil
	}

267 268
	log.Debug("DropCollection enqueued",
		zap.Uint64("BeginTs", dct.BeginTs()),
269
		zap.Uint64("EndTs", dct.EndTs()))
270 271 272

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
273
			zap.Error(err),
274
			zap.Uint64("BeginTs", dct.BeginTs()),
275
			zap.Uint64("EndTs", dct.EndTs()))
D
dragondriver 已提交
276

E
Enwei Jiao 已提交
277
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
278
		return &commonpb.Status{
279
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
280 281 282 283
			Reason:    err.Error(),
		}, nil
	}

284 285
	log.Debug("DropCollection done",
		zap.Uint64("BeginTs", dct.BeginTs()),
286
		zap.Uint64("EndTs", dct.EndTs()))
287

E
Enwei Jiao 已提交
288 289
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
290 291 292
	return dct.result, nil
}

293
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
294
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
295 296 297 298 299
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
300

E
Enwei Jiao 已提交
301 302
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HasCollection")
	defer sp.End()
303 304
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
305
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
306
		metrics.TotalLabel).Inc()
307

308
	log := log.Ctx(ctx).With(
309
		zap.String("role", typeutil.ProxyRole),
310 311 312
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

313 314
	log.Debug("HasCollection received")

315
	hct := &hasCollectionTask{
S
sunby 已提交
316
		ctx:                  ctx,
317 318
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
319
		rootCoord:            node.rootCoord,
320 321
	}

322 323
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
324
			zap.Error(err))
325

E
Enwei Jiao 已提交
326
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
327
			metrics.AbandonLabel).Inc()
328 329
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
330
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
331 332 333 334 335
				Reason:    err.Error(),
			},
		}, nil
	}

336 337
	log.Debug("HasCollection enqueued",
		zap.Uint64("BeginTS", hct.BeginTs()),
338
		zap.Uint64("EndTS", hct.EndTs()))
339 340 341

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
342
			zap.Error(err),
343
			zap.Uint64("BeginTS", hct.BeginTs()),
344
			zap.Uint64("EndTS", hct.EndTs()))
D
dragondriver 已提交
345

E
Enwei Jiao 已提交
346
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
347
			metrics.FailLabel).Inc()
348 349
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
350
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
351 352 353 354 355
				Reason:    err.Error(),
			},
		}, nil
	}

356 357
	log.Debug("HasCollection done",
		zap.Uint64("BeginTS", hct.BeginTs()),
358
		zap.Uint64("EndTS", hct.EndTs()))
359

E
Enwei Jiao 已提交
360
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
361
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
362
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
363 364 365
	return hct.result, nil
}

366
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
367
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
368 369 370
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
371

E
Enwei Jiao 已提交
372 373
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadCollection")
	defer sp.End()
374 375
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
376
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
377
		metrics.TotalLabel).Inc()
378
	lct := &loadCollectionTask{
S
sunby 已提交
379
		ctx:                   ctx,
380 381
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
382
		queryCoord:            node.queryCoord,
383
		datacoord:             node.dataCoord,
384 385
	}

386
	log := log.Ctx(ctx).With(
387
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
388
		zap.String("db", request.DbName),
389 390
		zap.String("collection", request.CollectionName),
		zap.Bool("refreshMode", request.Refresh))
391

392 393
	log.Debug("LoadCollection received")

394 395
	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
396
			zap.Error(err))
397

E
Enwei Jiao 已提交
398
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
399
			metrics.AbandonLabel).Inc()
400
		return &commonpb.Status{
401
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
402 403 404
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
405

406 407
	log.Debug("LoadCollection enqueued",
		zap.Uint64("BeginTS", lct.BeginTs()),
408
		zap.Uint64("EndTS", lct.EndTs()))
409 410 411

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
412
			zap.Error(err),
413
			zap.Uint64("BeginTS", lct.BeginTs()),
414
			zap.Uint64("EndTS", lct.EndTs()))
E
Enwei Jiao 已提交
415
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
416
			metrics.FailLabel).Inc()
417
		return &commonpb.Status{
418
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
419 420 421 422
			Reason:    err.Error(),
		}, nil
	}

423 424
	log.Debug("LoadCollection done",
		zap.Uint64("BeginTS", lct.BeginTs()),
425
		zap.Uint64("EndTS", lct.EndTs()))
426

E
Enwei Jiao 已提交
427
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
428
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
429
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
430
	return lct.result, nil
431 432
}

433
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
434
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
435 436 437
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
438

E
Enwei Jiao 已提交
439 440
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleaseCollection")
	defer sp.End()
441 442
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
443
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
444
		metrics.TotalLabel).Inc()
445
	rct := &releaseCollectionTask{
S
sunby 已提交
446
		ctx:                      ctx,
447 448
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
449
		queryCoord:               node.queryCoord,
450
		chMgr:                    node.chMgr,
451 452
	}

453
	log := log.Ctx(ctx).With(
454
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
455 456
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
457

458 459
	log.Debug(rpcReceived(method))

460
	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
461 462
		log.Warn(
			rpcFailedToEnqueue(method),
463
			zap.Error(err))
464

E
Enwei Jiao 已提交
465
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
466
			metrics.AbandonLabel).Inc()
467
		return &commonpb.Status{
468
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
469 470 471 472
			Reason:    err.Error(),
		}, nil
	}

473 474
	log.Debug(
		rpcEnqueued(method),
475
		zap.Uint64("BeginTS", rct.BeginTs()),
476
		zap.Uint64("EndTS", rct.EndTs()))
477 478

	if err := rct.WaitToFinish(); err != nil {
479 480
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
481
			zap.Error(err),
482
			zap.Uint64("BeginTS", rct.BeginTs()),
483
			zap.Uint64("EndTS", rct.EndTs()))
D
dragondriver 已提交
484

E
Enwei Jiao 已提交
485
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
486
			metrics.FailLabel).Inc()
487
		return &commonpb.Status{
488
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
489 490 491 492
			Reason:    err.Error(),
		}, nil
	}

493 494
	log.Debug(
		rpcDone(method),
495
		zap.Uint64("BeginTS", rct.BeginTs()),
496
		zap.Uint64("EndTS", rct.EndTs()))
497

E
Enwei Jiao 已提交
498
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
499
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
500
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
501
	return rct.result, nil
502 503
}

504
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
505
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
506 507 508 509 510
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
511

E
Enwei Jiao 已提交
512 513
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeCollection")
	defer sp.End()
514 515
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
516
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
517
		metrics.TotalLabel).Inc()
518

519
	dct := &describeCollectionTask{
S
sunby 已提交
520
		ctx:                       ctx,
521 522
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
523
		rootCoord:                 node.rootCoord,
524 525
	}

526
	log := log.Ctx(ctx).With(
527
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
528 529
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
530

531 532
	log.Debug("DescribeCollection received")

533 534
	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
535
			zap.Error(err))
536

E
Enwei Jiao 已提交
537
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
538
			metrics.AbandonLabel).Inc()
539 540
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
541
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
542 543 544 545 546
				Reason:    err.Error(),
			},
		}, nil
	}

547 548
	log.Debug("DescribeCollection enqueued",
		zap.Uint64("BeginTS", dct.BeginTs()),
549
		zap.Uint64("EndTS", dct.EndTs()))
550 551 552

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
553
			zap.Error(err),
554
			zap.Uint64("BeginTS", dct.BeginTs()),
555
			zap.Uint64("EndTS", dct.EndTs()))
D
dragondriver 已提交
556

E
Enwei Jiao 已提交
557
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
558
			metrics.FailLabel).Inc()
559

560 561
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
562
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
563 564 565 566 567
				Reason:    err.Error(),
			},
		}, nil
	}

568 569
	log.Debug("DescribeCollection done",
		zap.Uint64("BeginTS", dct.BeginTs()),
570
		zap.Uint64("EndTS", dct.EndTs()))
571

E
Enwei Jiao 已提交
572
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
573
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
574
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
575 576 577
	return dct.result, nil
}

578 579 580 581 582 583 584 585 586
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

E
Enwei Jiao 已提交
587 588
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetStatistics")
	defer sp.End()
589 590
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
591
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
592
		metrics.TotalLabel).Inc()
593 594 595 596 597 598 599 600 601 602
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

603
	log := log.Ctx(ctx).With(
604 605
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
606 607 608 609
		zap.String("collection", request.CollectionName))

	log.Debug(
		rpcReceived(method),
610 611 612 613 614 615 616 617
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
618
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
643
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
644 645 646 647 648 649 650 651 652 653 654 655 656
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
657
		zap.Uint64("EndTS", g.EndTs()))
658

E
Enwei Jiao 已提交
659
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
660
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
661
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
662 663 664
	return g.result, nil
}

665
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
666
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
667 668 669 670 671
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
672

E
Enwei Jiao 已提交
673 674
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCollectionStatistics")
	defer sp.End()
675 676
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
677
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
678
		metrics.TotalLabel).Inc()
679
	g := &getCollectionStatisticsTask{
G
godchen 已提交
680 681 682
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
683
		dataCoord:                      node.dataCoord,
684 685
	}

686
	log := log.Ctx(ctx).With(
687
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
688 689
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
690

691 692
	log.Debug(rpcReceived(method))

693
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
694 695
		log.Warn(
			rpcFailedToEnqueue(method),
696
			zap.Error(err))
697

E
Enwei Jiao 已提交
698
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
699
			metrics.AbandonLabel).Inc()
700

G
godchen 已提交
701
		return &milvuspb.GetCollectionStatisticsResponse{
702
			Status: &commonpb.Status{
703
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
704 705 706 707 708
				Reason:    err.Error(),
			},
		}, nil
	}

709 710
	log.Debug(
		rpcEnqueued(method),
711
		zap.Uint64("BeginTS", g.BeginTs()),
712
		zap.Uint64("EndTS", g.EndTs()))
713 714

	if err := g.WaitToFinish(); err != nil {
715 716
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
717
			zap.Error(err),
718
			zap.Uint64("BeginTS", g.BeginTs()),
719
			zap.Uint64("EndTS", g.EndTs()))
D
dragondriver 已提交
720

E
Enwei Jiao 已提交
721
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
722
			metrics.FailLabel).Inc()
723

G
godchen 已提交
724
		return &milvuspb.GetCollectionStatisticsResponse{
725
			Status: &commonpb.Status{
726
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
727 728 729 730 731
				Reason:    err.Error(),
			},
		}, nil
	}

732 733
	log.Debug(
		rpcDone(method),
734
		zap.Uint64("BeginTS", g.BeginTs()),
735
		zap.Uint64("EndTS", g.EndTs()))
736

E
Enwei Jiao 已提交
737
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
738
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
739
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
740
	return g.result, nil
741 742
}

743
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
744
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
745 746 747 748 749
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
E
Enwei Jiao 已提交
750 751
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ShowCollections")
	defer sp.End()
752 753
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
754
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
755

756
	sct := &showCollectionsTask{
G
godchen 已提交
757 758 759
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
760
		queryCoord:             node.queryCoord,
761
		rootCoord:              node.rootCoord,
762 763
	}

764
	log := log.Ctx(ctx).With(
765
		zap.String("role", typeutil.ProxyRole),
766 767
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
768 769 770 771
		zap.String("ShowType", request.Type.String()))

	log.Debug("ShowCollections received",
		zap.Any("CollectionNames", request.CollectionNames))
772

773
	err := node.sched.ddQueue.Enqueue(sct)
774
	if err != nil {
775 776
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
777
			zap.Any("CollectionNames", request.CollectionNames))
778

E
Enwei Jiao 已提交
779
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
780
		return &milvuspb.ShowCollectionsResponse{
781
			Status: &commonpb.Status{
782
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
783 784 785 786 787
				Reason:    err.Error(),
			},
		}, nil
	}

788
	log.Debug("ShowCollections enqueued",
789
		zap.Any("CollectionNames", request.CollectionNames))
D
dragondriver 已提交
790

791 792
	err = sct.WaitToFinish()
	if err != nil {
793 794
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
795
			zap.Any("CollectionNames", request.CollectionNames))
796

E
Enwei Jiao 已提交
797
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
798

G
godchen 已提交
799
		return &milvuspb.ShowCollectionsResponse{
800
			Status: &commonpb.Status{
801
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
802 803 804 805 806
				Reason:    err.Error(),
			},
		}, nil
	}

807
	log.Debug("ShowCollections Done",
808 809
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
810

E
Enwei Jiao 已提交
811 812
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
813 814 815
	return sct.result, nil
}

J
jaime 已提交
816 817 818 819 820
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

E
Enwei Jiao 已提交
821 822
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterCollection")
	defer sp.End()
J
jaime 已提交
823 824 825
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
826
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
827 828 829 830 831 832 833 834

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

835
	log := log.Ctx(ctx).With(
J
jaime 已提交
836 837 838 839
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

840 841 842
	log.Debug(
		rpcReceived(method))

J
jaime 已提交
843 844 845
	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
846
			zap.Error(err))
J
jaime 已提交
847

E
Enwei Jiao 已提交
848
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
849 850 851 852 853 854 855 856 857 858
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
859
		zap.Uint64("timestamp", request.Base.Timestamp))
J
jaime 已提交
860 861 862 863 864 865

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.Uint64("BeginTs", act.BeginTs()),
866
			zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
867

E
Enwei Jiao 已提交
868
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
869 870 871 872 873 874 875 876 877
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", act.BeginTs()),
878
		zap.Uint64("EndTs", act.EndTs()))
J
jaime 已提交
879

E
Enwei Jiao 已提交
880 881
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
882 883 884
	return act.result, nil
}

885
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
886
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
887 888 889
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
890

E
Enwei Jiao 已提交
891 892
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreatePartition")
	defer sp.End()
893 894
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
895
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
896

897
	cpt := &createPartitionTask{
S
sunby 已提交
898
		ctx:                    ctx,
899 900
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
901
		rootCoord:              node.rootCoord,
902 903 904
		result:                 nil,
	}

905
	log := log.Ctx(ctx).With(
906
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
907 908 909
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
910

911 912
	log.Debug(rpcReceived("CreatePartition"))

913 914 915
	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
916
			zap.Error(err))
917

E
Enwei Jiao 已提交
918
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
919

920
		return &commonpb.Status{
921
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
922 923 924
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
925

926 927 928
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
929
		zap.Uint64("EndTS", cpt.EndTs()))
930 931 932 933

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
934
			zap.Error(err),
935
			zap.Uint64("BeginTS", cpt.BeginTs()),
936
			zap.Uint64("EndTS", cpt.EndTs()))
D
dragondriver 已提交
937

E
Enwei Jiao 已提交
938
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
939

940
		return &commonpb.Status{
941
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
942 943 944
			Reason:    err.Error(),
		}, nil
	}
945 946 947 948

	log.Debug(
		rpcDone("CreatePartition"),
		zap.Uint64("BeginTS", cpt.BeginTs()),
949
		zap.Uint64("EndTS", cpt.EndTs()))
950

E
Enwei Jiao 已提交
951 952
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
953 954 955
	return cpt.result, nil
}

956
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
957
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
958 959 960
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
961

E
Enwei Jiao 已提交
962 963
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropPartition")
	defer sp.End()
964 965
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
966
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
967

968
	dpt := &dropPartitionTask{
S
sunby 已提交
969
		ctx:                  ctx,
970 971
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
972
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
973
		queryCoord:           node.queryCoord,
974 975 976
		result:               nil,
	}

977
	log := log.Ctx(ctx).With(
978
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
979 980 981
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
982

983 984
	log.Debug(rpcReceived(method))

985 986 987
	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
988
			zap.Error(err))
989

E
Enwei Jiao 已提交
990
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
991

992
		return &commonpb.Status{
993
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
994 995 996
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
997

998 999 1000
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1001
		zap.Uint64("EndTS", dpt.EndTs()))
1002 1003 1004 1005

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1006
			zap.Error(err),
1007
			zap.Uint64("BeginTS", dpt.BeginTs()),
1008
			zap.Uint64("EndTS", dpt.EndTs()))
D
dragondriver 已提交
1009

E
Enwei Jiao 已提交
1010
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1011

1012
		return &commonpb.Status{
1013
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1014 1015 1016
			Reason:    err.Error(),
		}, nil
	}
1017 1018 1019 1020

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", dpt.BeginTs()),
1021
		zap.Uint64("EndTS", dpt.EndTs()))
1022

E
Enwei Jiao 已提交
1023 1024
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1025 1026 1027
	return dpt.result, nil
}

1028
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1029
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1030 1031 1032 1033 1034
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1035

E
Enwei Jiao 已提交
1036 1037
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-HasPartition")
	defer sp.End()
1038 1039 1040
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1041
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1042
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1043

1044
	hpt := &hasPartitionTask{
S
sunby 已提交
1045
		ctx:                 ctx,
1046 1047
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1048
		rootCoord:           node.rootCoord,
1049 1050 1051
		result:              nil,
	}

1052
	log := log.Ctx(ctx).With(
1053
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1054 1055 1056
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1057

1058 1059
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1060 1061 1062
	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1063
			zap.Error(err))
D
dragondriver 已提交
1064

E
Enwei Jiao 已提交
1065
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1066
			metrics.AbandonLabel).Inc()
1067

1068 1069
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1070
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1071 1072 1073 1074 1075
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1076

D
dragondriver 已提交
1077 1078 1079
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1080
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1081 1082 1083 1084

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1085
			zap.Error(err),
D
dragondriver 已提交
1086
			zap.Uint64("BeginTS", hpt.BeginTs()),
1087
			zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1088

E
Enwei Jiao 已提交
1089
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1090
			metrics.FailLabel).Inc()
1091

1092 1093
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1094
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1095 1096 1097 1098 1099
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1100 1101 1102 1103

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", hpt.BeginTs()),
1104
		zap.Uint64("EndTS", hpt.EndTs()))
D
dragondriver 已提交
1105

E
Enwei Jiao 已提交
1106
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1107
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1108
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1109 1110 1111
	return hpt.result, nil
}

1112
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1113
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1114 1115 1116
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1117

E
Enwei Jiao 已提交
1118 1119
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadPartitions")
	defer sp.End()
1120 1121
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1122
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1123
		metrics.TotalLabel).Inc()
1124
	lpt := &loadPartitionsTask{
G
godchen 已提交
1125 1126 1127
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1128
		queryCoord:            node.queryCoord,
1129
		datacoord:             node.dataCoord,
1130 1131
	}

1132
	log := log.Ctx(ctx).With(
1133
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1134 1135
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
1136 1137
		zap.Strings("partitions", request.PartitionNames),
		zap.Bool("refreshMode", request.Refresh))
1138

1139 1140
	log.Debug(rpcReceived(method))

1141 1142 1143
	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1144
			zap.Error(err))
1145

E
Enwei Jiao 已提交
1146
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1147
			metrics.AbandonLabel).Inc()
1148

1149
		return &commonpb.Status{
1150
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1151 1152 1153 1154
			Reason:    err.Error(),
		}, nil
	}

1155 1156 1157
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1158
		zap.Uint64("EndTS", lpt.EndTs()))
1159 1160 1161 1162

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1163
			zap.Error(err),
1164
			zap.Uint64("BeginTS", lpt.BeginTs()),
1165
			zap.Uint64("EndTS", lpt.EndTs()))
D
dragondriver 已提交
1166

E
Enwei Jiao 已提交
1167
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1168
			metrics.FailLabel).Inc()
1169

1170
		return &commonpb.Status{
1171
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1172 1173 1174 1175
			Reason:    err.Error(),
		}, nil
	}

1176 1177 1178
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", lpt.BeginTs()),
1179
		zap.Uint64("EndTS", lpt.EndTs()))
1180

E
Enwei Jiao 已提交
1181
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1182
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1183
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1184
	return lpt.result, nil
1185 1186
}

1187
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1188
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1189 1190 1191
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1192

E
Enwei Jiao 已提交
1193 1194
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ReleasePartitions")
	defer sp.End()
1195

1196
	rpt := &releasePartitionsTask{
G
godchen 已提交
1197 1198 1199
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1200
		queryCoord:               node.queryCoord,
1201 1202
	}

1203
	method := "ReleasePartitions"
1204
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1205
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1206
		metrics.TotalLabel).Inc()
1207 1208

	log := log.Ctx(ctx).With(
1209
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1210 1211 1212
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1213

1214 1215
	log.Debug(rpcReceived(method))

1216 1217 1218
	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1219
			zap.Error(err))
1220

E
Enwei Jiao 已提交
1221
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1222
			metrics.AbandonLabel).Inc()
1223

1224
		return &commonpb.Status{
1225
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1226 1227 1228 1229
			Reason:    err.Error(),
		}, nil
	}

1230 1231 1232
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1233
		zap.Uint64("EndTS", rpt.EndTs()))
1234 1235 1236 1237

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1238
			zap.Error(err),
1239
			zap.Uint64("BeginTS", rpt.BeginTs()),
1240
			zap.Uint64("EndTS", rpt.EndTs()))
D
dragondriver 已提交
1241

E
Enwei Jiao 已提交
1242
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1243
			metrics.FailLabel).Inc()
1244

1245
		return &commonpb.Status{
1246
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1247 1248 1249 1250
			Reason:    err.Error(),
		}, nil
	}

1251 1252 1253
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", rpt.BeginTs()),
1254
		zap.Uint64("EndTS", rpt.EndTs()))
1255

E
Enwei Jiao 已提交
1256
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1257
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1258
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1259
	return rpt.result, nil
1260 1261
}

1262
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1263
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1264 1265 1266 1267 1268
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1269

E
Enwei Jiao 已提交
1270 1271
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetPartitionStatistics")
	defer sp.End()
1272 1273
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1274
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1275
		metrics.TotalLabel).Inc()
1276

1277
	g := &getPartitionStatisticsTask{
1278 1279 1280
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1281
		dataCoord:                     node.dataCoord,
1282 1283
	}

1284
	log := log.Ctx(ctx).With(
1285
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1286 1287 1288
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1289

1290 1291
	log.Debug(rpcReceived(method))

1292 1293 1294
	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1295
			zap.Error(err))
1296

E
Enwei Jiao 已提交
1297
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1298
			metrics.AbandonLabel).Inc()
1299

1300 1301 1302 1303 1304 1305 1306 1307
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1308 1309 1310
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1311
		zap.Uint64("EndTS", g.EndTs()))
1312 1313 1314 1315

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1316
			zap.Error(err),
1317
			zap.Uint64("BeginTS", g.BeginTs()),
1318
			zap.Uint64("EndTS", g.EndTs()))
1319

E
Enwei Jiao 已提交
1320
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1321
			metrics.FailLabel).Inc()
1322

1323 1324 1325 1326 1327 1328 1329 1330
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1331 1332 1333
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", g.BeginTs()),
1334
		zap.Uint64("EndTS", g.EndTs()))
1335

E
Enwei Jiao 已提交
1336
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1337
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1338
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1339
	return g.result, nil
1340 1341
}

1342
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1343
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1344 1345 1346 1347 1348
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1349

E
Enwei Jiao 已提交
1350 1351
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ShowPartitions")
	defer sp.End()
1352

1353
	spt := &showPartitionsTask{
G
godchen 已提交
1354 1355 1356
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1357
		rootCoord:             node.rootCoord,
1358
		queryCoord:            node.queryCoord,
G
godchen 已提交
1359
		result:                nil,
1360 1361
	}

1362
	method := "ShowPartitions"
1363 1364
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1365
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1366
		metrics.TotalLabel).Inc()
1367

1368 1369
	log := log.Ctx(ctx).With(zap.String("role", typeutil.ProxyRole))

1370 1371
	log.Debug(
		rpcReceived(method),
G
godchen 已提交
1372
		zap.Any("request", request))
1373 1374 1375 1376 1377 1378 1379

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.Any("request", request))

E
Enwei Jiao 已提交
1380
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1381
			metrics.AbandonLabel).Inc()
1382

G
godchen 已提交
1383
		return &milvuspb.ShowPartitionsResponse{
1384
			Status: &commonpb.Status{
1385
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1386 1387 1388 1389 1390
				Reason:    err.Error(),
			},
		}, nil
	}

1391 1392 1393 1394
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1395 1396
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1397 1398 1399 1400 1401
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1402
			zap.Error(err),
1403 1404 1405 1406 1407
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1408

E
Enwei Jiao 已提交
1409
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1410
			metrics.FailLabel).Inc()
1411

G
godchen 已提交
1412
		return &milvuspb.ShowPartitionsResponse{
1413
			Status: &commonpb.Status{
1414
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1415 1416 1417 1418
				Reason:    err.Error(),
			},
		}, nil
	}
1419 1420 1421 1422 1423 1424 1425 1426 1427

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1428
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1429
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1430
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1431 1432 1433
	return spt.result, nil
}

S
SimFG 已提交
1434 1435 1436 1437 1438 1439
func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1440 1441
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadingProgress")
	defer sp.End()
E
Enwei Jiao 已提交
1442
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1443 1444 1445
	log := log.Ctx(ctx)

	log.Debug(
S
SimFG 已提交
1446 1447 1448 1449
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
J
Jiquan Long 已提交
1450
		log.Warn("fail to get loading progress",
1451
			zap.String("collection_name", request.CollectionName),
S
SimFG 已提交
1452 1453
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
E
Enwei Jiao 已提交
1454
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1455 1456 1457 1458 1459
		if errors.Is(err, ErrInsufficientMemory) {
			return &milvuspb.GetLoadingProgressResponse{
				Status: InSufficientMemoryStatus(request.GetCollectionName()),
			}
		}
S
SimFG 已提交
1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
S
SimFG 已提交
1474

1475 1476 1477
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1478
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1479
	)
S
SimFG 已提交
1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
S
SimFG 已提交
1490
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
S
SimFG 已提交
1491 1492 1493
			return getErrResponse(err), nil
		}
	} else {
S
SimFG 已提交
1494 1495
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
S
SimFG 已提交
1496 1497 1498 1499
			return getErrResponse(err), nil
		}
	}

1500
	log.Debug(
S
SimFG 已提交
1501 1502
		rpcDone(method),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1503 1504
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1505 1506 1507 1508 1509 1510 1511 1512
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1513
func (node *Proxy) GetLoadState(ctx context.Context, request *milvuspb.GetLoadStateRequest) (*milvuspb.GetLoadStateResponse, error) {
S
SimFG 已提交
1514 1515 1516 1517 1518
	if !node.checkHealthy() {
		return &milvuspb.GetLoadStateResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1519 1520
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetLoadState")
	defer sp.End()
S
SimFG 已提交
1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
	log := log.Ctx(ctx)

	log.Debug(
		rpcReceived(method),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadStateResponse {
		log.Warn("fail to get load state",
			zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames),
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
		return &milvuspb.GetLoadStateResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}

1546 1547
	// TODO(longjiquan): https://github.com/milvus-io/milvus/issues/21485, Remove `GetComponentStates` after error code
	// 	is ready to distinguish case whether the querycoord is not healthy or the collection is not even loaded.
S
SimFG 已提交
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
	if statesResp, err := node.queryCoord.GetComponentStates(ctx); err != nil {
		return getErrResponse(err), nil
	} else if statesResp.State == nil || statesResp.State.StateCode != commonpb.StateCode_Healthy {
		return getErrResponse(fmt.Errorf("the querycoord server isn't healthy, state: %v", statesResp.State)), nil
	}

	successResponse := &milvuspb.GetLoadStateResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	defer func() {
		log.Debug(
			rpcDone(method),
			zap.Any("request", request))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
		metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	}()

	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		successResponse.State = commonpb.LoadState_LoadStateNotExist
		return successResponse, nil
	}

	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = getCollectionProgress(ctx, node.queryCoord, request.GetBase(), collectionID); err != nil {
1589 1590 1591 1592 1593
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1594 1595 1596 1597 1598 1599
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	} else {
		if progress, err = getPartitionProgress(ctx, node.queryCoord, request.GetBase(),
			request.GetPartitionNames(), request.GetCollectionName(), collectionID); err != nil {
1600 1601 1602 1603 1604
			if errors.Is(err, ErrInsufficientMemory) {
				return &milvuspb.GetLoadStateResponse{
					Status: InSufficientMemoryStatus(request.GetCollectionName()),
				}, nil
			}
S
SimFG 已提交
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614
			successResponse.State = commonpb.LoadState_LoadStateNotLoad
			return successResponse, nil
		}
	}
	if progress >= 100 {
		successResponse.State = commonpb.LoadState_LoadStateLoaded
	} else {
		successResponse.State = commonpb.LoadState_LoadStateLoading
	}
	return successResponse, nil
1615 1616
}

1617
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1618
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1619 1620 1621
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1622

E
Enwei Jiao 已提交
1623 1624
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateIndex")
	defer sp.End()
D
dragondriver 已提交
1625

1626
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1627 1628 1629 1630
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
1631
		datacoord:  node.dataCoord,
1632
		queryCoord: node.queryCoord,
1633 1634
	}

D
dragondriver 已提交
1635
	method := "CreateIndex"
1636
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1637
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1638
		metrics.TotalLabel).Inc()
1639 1640

	log := log.Ctx(ctx).With(
1641
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1642 1643 1644 1645
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1646

1647 1648
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1649 1650 1651
	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1652
			zap.Error(err))
D
dragondriver 已提交
1653

E
Enwei Jiao 已提交
1654
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1655
			metrics.AbandonLabel).Inc()
1656

1657
		return &commonpb.Status{
1658
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1659 1660 1661 1662
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1663 1664 1665
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1666
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1667 1668 1669 1670

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1671
			zap.Error(err),
D
dragondriver 已提交
1672
			zap.Uint64("BeginTs", cit.BeginTs()),
1673
			zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1674

E
Enwei Jiao 已提交
1675
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1676
			metrics.FailLabel).Inc()
1677

1678
		return &commonpb.Status{
1679
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1680 1681 1682 1683
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1684 1685 1686
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cit.BeginTs()),
1687
		zap.Uint64("EndTs", cit.EndTs()))
D
dragondriver 已提交
1688

E
Enwei Jiao 已提交
1689
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1690
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1691
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1692 1693 1694
	return cit.result, nil
}

1695
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1696
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1697 1698 1699 1700 1701
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1702

E
Enwei Jiao 已提交
1703 1704
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeIndex")
	defer sp.End()
1705

1706
	dit := &describeIndexTask{
S
sunby 已提交
1707
		ctx:                  ctx,
1708 1709
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1710
		datacoord:            node.dataCoord,
1711 1712
	}

1713 1714
	method := "DescribeIndex"
	// avoid data race
1715
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1716
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1717
		metrics.TotalLabel).Inc()
1718 1719

	log := log.Ctx(ctx).With(
1720
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1721 1722 1723
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1724 1725 1726
		zap.String("index name", request.IndexName))

	log.Debug(rpcReceived(method))
1727 1728 1729 1730

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1731
			zap.Error(err))
1732

E
Enwei Jiao 已提交
1733
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1734
			metrics.AbandonLabel).Inc()
1735

1736 1737
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1738
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1739 1740 1741 1742 1743
				Reason:    err.Error(),
			},
		}, nil
	}

1744 1745 1746
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1747
		zap.Uint64("EndTs", dit.EndTs()))
1748 1749 1750 1751

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1752
			zap.Error(err),
1753
			zap.Uint64("BeginTs", dit.BeginTs()),
1754
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1755

Z
zhenshan.cao 已提交
1756 1757 1758 1759
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
1760
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1761
			metrics.FailLabel).Inc()
1762

1763 1764
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1765
				ErrorCode: errCode,
1766 1767 1768 1769 1770
				Reason:    err.Error(),
			},
		}, nil
	}

1771 1772 1773
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1774
		zap.Uint64("EndTs", dit.EndTs()))
1775

E
Enwei Jiao 已提交
1776
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1777
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1778
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1779 1780 1781
	return dit.result, nil
}

1782
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1783
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1784 1785 1786
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1787

E
Enwei Jiao 已提交
1788 1789
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropIndex")
	defer sp.End()
D
dragondriver 已提交
1790

1791
	dit := &dropIndexTask{
S
sunby 已提交
1792
		ctx:              ctx,
B
BossZou 已提交
1793 1794
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1795
		dataCoord:        node.dataCoord,
1796
		queryCoord:       node.queryCoord,
B
BossZou 已提交
1797
	}
G
godchen 已提交
1798

D
dragondriver 已提交
1799
	method := "DropIndex"
1800
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1801
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1802
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1803

1804
	log := log.Ctx(ctx).With(
1805
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1806 1807 1808 1809 1810
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1811 1812
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
1813 1814 1815
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1816
			zap.Error(err))
E
Enwei Jiao 已提交
1817
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1818
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1819

B
BossZou 已提交
1820
		return &commonpb.Status{
1821
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1822 1823 1824
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1825

D
dragondriver 已提交
1826 1827 1828
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1829
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1830 1831 1832 1833

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1834
			zap.Error(err),
D
dragondriver 已提交
1835
			zap.Uint64("BeginTs", dit.BeginTs()),
1836
			zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1837

E
Enwei Jiao 已提交
1838
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1839
			metrics.FailLabel).Inc()
1840

B
BossZou 已提交
1841
		return &commonpb.Status{
1842
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1843 1844 1845
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1846 1847 1848 1849

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dit.BeginTs()),
1850
		zap.Uint64("EndTs", dit.EndTs()))
D
dragondriver 已提交
1851

E
Enwei Jiao 已提交
1852
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1853
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1854
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1855 1856 1857
	return dit.result, nil
}

1858 1859
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
1860
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1861
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1862 1863 1864 1865 1866
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1867

E
Enwei Jiao 已提交
1868 1869
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.End()
1870

1871
	gibpt := &getIndexBuildProgressTask{
1872 1873 1874
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1875
		rootCoord:                    node.rootCoord,
1876
		dataCoord:                    node.dataCoord,
1877 1878
	}

1879
	method := "GetIndexBuildProgress"
1880
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1881
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1882
		metrics.TotalLabel).Inc()
1883 1884

	log := log.Ctx(ctx).With(
1885
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1886 1887 1888 1889
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1890

1891 1892
	log.Debug(rpcReceived(method))

1893 1894 1895
	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1896
			zap.Error(err))
E
Enwei Jiao 已提交
1897
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1898
			metrics.AbandonLabel).Inc()
1899

1900 1901 1902 1903 1904 1905 1906 1907
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1908 1909 1910
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1911
		zap.Uint64("EndTs", gibpt.EndTs()))
1912 1913 1914 1915

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1916
			zap.Error(err),
1917
			zap.Uint64("BeginTs", gibpt.BeginTs()),
1918
			zap.Uint64("EndTs", gibpt.EndTs()))
E
Enwei Jiao 已提交
1919
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1920
			metrics.FailLabel).Inc()
1921 1922 1923 1924 1925 1926 1927 1928

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
1929 1930 1931 1932

	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
1933
		zap.Uint64("EndTs", gibpt.EndTs()))
1934

E
Enwei Jiao 已提交
1935
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1936
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1937
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1938
	return gibpt.result, nil
1939 1940
}

1941
// GetIndexState get the build-state of index.
1942
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
1943
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
1944 1945 1946 1947 1948
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1949

E
Enwei Jiao 已提交
1950 1951
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert")
	defer sp.End()
1952

1953
	dipt := &getIndexStateTask{
G
godchen 已提交
1954 1955 1956
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
1957
		dataCoord:            node.dataCoord,
1958
		rootCoord:            node.rootCoord,
1959 1960
	}

1961
	method := "GetIndexState"
1962
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1963
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1964
		metrics.TotalLabel).Inc()
1965 1966

	log := log.Ctx(ctx).With(
1967
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1968 1969 1970 1971
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1972

1973 1974
	log.Debug(rpcReceived(method))

1975 1976 1977
	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
1978
			zap.Error(err))
1979

E
Enwei Jiao 已提交
1980
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1981
			metrics.AbandonLabel).Inc()
1982

G
godchen 已提交
1983
		return &milvuspb.GetIndexStateResponse{
1984
			Status: &commonpb.Status{
1985
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1986 1987 1988 1989 1990
				Reason:    err.Error(),
			},
		}, nil
	}

1991 1992 1993
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
1994
		zap.Uint64("EndTs", dipt.EndTs()))
1995 1996 1997 1998

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1999
			zap.Error(err),
2000
			zap.Uint64("BeginTs", dipt.BeginTs()),
2001
			zap.Uint64("EndTs", dipt.EndTs()))
E
Enwei Jiao 已提交
2002
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2003
			metrics.FailLabel).Inc()
2004

G
godchen 已提交
2005
		return &milvuspb.GetIndexStateResponse{
2006
			Status: &commonpb.Status{
2007
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2008 2009 2010 2011 2012
				Reason:    err.Error(),
			},
		}, nil
	}

2013 2014 2015
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dipt.BeginTs()),
2016
		zap.Uint64("EndTs", dipt.EndTs()))
2017

E
Enwei Jiao 已提交
2018
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2019
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2020
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2021 2022 2023
	return dipt.result, nil
}

2024
// Insert insert records into collection.
C
Cai Yudong 已提交
2025
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2026 2027
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Insert")
	defer sp.End()
X
Xiangyu Wang 已提交
2028

2029 2030 2031 2032 2033
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2034 2035
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
S
smellthemoon 已提交
2036
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(proto.Size(request)))
E
Enwei Jiao 已提交
2037
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
smellthemoon 已提交
2038

2039
	it := &insertTask{
2040 2041
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2042
		// req:       request,
2043
		insertMsg: &msgstream.InsertMsg{
2044 2045 2046
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
2047
			InsertRequest: msgpb.InsertRequest{
2048 2049 2050
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2051
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2052
				),
2053 2054
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2055 2056
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
2057
				Version:        msgpb.InsertDataVersion_ColumnBased,
2058
				// RowData: transfer column based request to this
2059 2060
			},
		},
2061
		idAllocator:   node.rowIDAllocator,
2062 2063 2064
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2065
	}
2066

2067 2068
	if len(it.insertMsg.PartitionName) <= 0 {
		it.insertMsg.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
2069 2070
	}

X
Xiangyu Wang 已提交
2071
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2072
		numRows := request.NumRows
2073 2074 2075 2076
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2077

X
Xiangyu Wang 已提交
2078 2079 2080 2081 2082 2083 2084
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2085 2086
	}

X
Xiangyu Wang 已提交
2087
	log.Debug("Enqueue insert request in Proxy",
2088
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2089 2090 2091 2092 2093
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2094
		zap.Uint32("NumRows", request.NumRows))
D
dragondriver 已提交
2095

X
Xiangyu Wang 已提交
2096
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
J
Jiquan Long 已提交
2097
		log.Warn("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2098
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2099
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2100
		return constructFailedResponse(err), nil
2101
	}
D
dragondriver 已提交
2102

X
Xiangyu Wang 已提交
2103
	log.Debug("Detail of insert request in Proxy",
2104
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2105 2106 2107 2108 2109
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2110
		zap.Uint32("NumRows", request.NumRows))
X
Xiangyu Wang 已提交
2111 2112

	if err := it.WaitToFinish(); err != nil {
2113
		log.Warn("Failed to execute insert task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2114
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2115
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2116 2117 2118 2119 2120
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2121
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2133
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2134

S
smellthemoon 已提交
2135 2136 2137
	receiveSize := proto.Size(it.insertMsg)
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2138
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2139
		metrics.SuccessLabel).Inc()
2140
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2141 2142 2143
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2144 2145 2146
	return it.result, nil
}

2147
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2148
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2149 2150
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete")
	defer sp.End()
2151 2152 2153
	log := log.Ctx(ctx)
	log.Debug("Start processing delete request in Proxy")
	defer log.Debug("Finish processing delete request in Proxy")
2154

S
smellthemoon 已提交
2155
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(proto.Size(request)))
2156

G
groot 已提交
2157 2158 2159 2160 2161 2162
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2163 2164 2165
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2166
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2167
		metrics.TotalLabel).Inc()
2168
	dt := &deleteTask{
X
xige-16 已提交
2169 2170 2171
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
2172
		deleteMsg: &BaseDeleteTask{
G
godchen 已提交
2173 2174 2175
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
2176
			DeleteRequest: msgpb.DeleteRequest{
2177 2178 2179 2180
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2181
				DbName:         request.DbName,
G
godchen 已提交
2182 2183 2184
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2185 2186
			},
		},
S
smellthemoon 已提交
2187 2188 2189
		idAllocator: node.rowIDAllocator,
		chMgr:       node.chMgr,
		chTicker:    node.chTicker,
G
groot 已提交
2190 2191
	}

2192
	log.Debug("Enqueue delete request in Proxy",
2193
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2194 2195 2196 2197
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2198 2199 2200

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
2201
		log.Error("Failed to enqueue delete task: " + err.Error())
E
Enwei Jiao 已提交
2202
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2203
			metrics.AbandonLabel).Inc()
2204

G
groot 已提交
2205 2206 2207 2208 2209 2210 2211 2212
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2213
	log.Debug("Detail of delete request in Proxy",
2214
		zap.String("role", typeutil.ProxyRole),
2215
		zap.Uint64("timestamp", dt.deleteMsg.Base.Timestamp),
G
groot 已提交
2216 2217 2218
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2219
		zap.String("expr", request.Expr))
G
groot 已提交
2220

2221
	if err := dt.WaitToFinish(); err != nil {
2222
		log.Error("Failed to execute delete task in task scheduler: " + err.Error())
E
Enwei Jiao 已提交
2223
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2224
			metrics.FailLabel).Inc()
G
groot 已提交
2225 2226 2227 2228 2229 2230 2231 2232
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

S
smellthemoon 已提交
2233 2234 2235
	receiveSize := proto.Size(dt.deleteMsg)
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))

E
Enwei Jiao 已提交
2236
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2237
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2238 2239
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2240 2241 2242
	return dt.result, nil
}

S
smellthemoon 已提交
2243 2244
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
E
Enwei Jiao 已提交
2245 2246
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert")
	defer sp.End()
S
smellthemoon 已提交
2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Uint32("NumRows", request.NumRows),
	)
	log.Debug("Start processing upsert request in Proxy")

	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
	method := "Upsert"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Add(float64(proto.Size(request)))
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()

	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),

		req: &milvuspb.UpsertRequest{
			Base: commonpbutil.NewMsgBase(
2277
				commonpbutil.WithMsgType(commonpb.MsgType_Upsert),
S
smellthemoon 已提交
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
			),
			CollectionName: request.CollectionName,
			PartitionName:  request.PartitionName,
			FieldsData:     request.FieldsData,
			NumRows:        request.NumRows,
		},

		result: &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	if len(it.req.PartitionName) <= 0 {
		it.req.PartitionName = Params.CommonCfg.DefaultPartitionName.GetValue()
	}

	constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
		numRows := request.NumRows
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}

		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: errCode,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
	}

	log.Debug("Enqueue upsert request in Proxy",
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)))

	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Info("Failed to enqueue upsert task",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("Detail of upsert request in Proxy",
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()))

	if err := it.WaitToFinish(); err != nil {
		log.Info("Failed to execute insert task in task scheduler",
			zap.Error(err))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
S
smellthemoon 已提交
2347 2348 2349 2350 2351
		// Not every error case changes the status internally
		// change status there to handle it
		if it.result.Status.ErrorCode == commonpb.ErrorCode_Success {
			it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		}
S
smellthemoon 已提交
2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381
		return constructFailedResponse(err, it.result.Status.ErrorCode), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
			numRows := request.NumRows
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}
		setErrorIndex()
	}

	insertReceiveSize := proto.Size(it.upsertMsg.InsertMsg)
	deleteReceiveSize := proto.Size(it.upsertMsg.DeleteMsg)

	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(deleteReceiveSize))
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(insertReceiveSize))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

	log.Debug("Finish processing upsert request in Proxy")
	return it.result, nil
}

2382
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2383
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2384
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2385
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2386 2387 2388

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2389 2390 2391 2392 2393
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2394 2395
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2396
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2397
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2398

E
Enwei Jiao 已提交
2399 2400
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search")
	defer sp.End()
D
dragondriver 已提交
2401

2402
	qt := &searchTask{
S
sunby 已提交
2403
		ctx:       ctx,
2404
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2405
		SearchRequest: &internalpb.SearchRequest{
2406 2407
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2408
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2409
			),
E
Enwei Jiao 已提交
2410
			ReqID: paramtable.GetNodeID(),
2411
		},
2412 2413 2414 2415
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2416 2417
	}

2418 2419 2420
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

2421
	log := log.Ctx(ctx).With(
2422
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2423 2424 2425 2426 2427
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2428 2429 2430 2431
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2432

2433 2434 2435
	log.Debug(
		rpcReceived(method))

2436
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2437
		log.Warn(
2438
			rpcFailedToEnqueue(method),
2439
			zap.Error(err))
D
dragondriver 已提交
2440

E
Enwei Jiao 已提交
2441
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2442
			metrics.AbandonLabel).Inc()
2443

2444 2445
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2446
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2447 2448 2449 2450
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2451
	tr.CtxRecord(ctx, "search request enqueue")
2452

2453
	log.Debug(
2454
		rpcEnqueued(method),
2455
		zap.Uint64("timestamp", qt.Base.Timestamp))
D
dragondriver 已提交
2456

2457
	if err := qt.WaitToFinish(); err != nil {
2458
		log.Warn(
2459
			rpcFailedToWaitToFinish(method),
2460
			zap.Error(err))
2461

E
Enwei Jiao 已提交
2462
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2463
			metrics.FailLabel).Inc()
2464

2465 2466
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2467
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2468 2469 2470 2471 2472
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2473
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2474
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2475
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2476
	tr.CtxRecord(ctx, "wait search result")
2477
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2478

E
Enwei Jiao 已提交
2479
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2480
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2481
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2482
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2483
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2484
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2485
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2486
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2487 2488
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2489
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2490
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2491
	}
2492 2493 2494
	return qt.result, nil
}

2495
// Flush notify data nodes to persist the data of collection.
2496 2497 2498 2499 2500 2501 2502
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2503
	if !node.checkHealthy() {
2504 2505
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2506
	}
D
dragondriver 已提交
2507

E
Enwei Jiao 已提交
2508 2509
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Flush")
	defer sp.End()
D
dragondriver 已提交
2510

2511
	ft := &flushTask{
T
ThreadDao 已提交
2512 2513 2514
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2515
		dataCoord:    node.dataCoord,
2516 2517
	}

D
dragondriver 已提交
2518
	method := "Flush"
2519
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2520
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2521

2522
	log := log.Ctx(ctx).With(
2523
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2524 2525
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2526

2527 2528
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2529 2530 2531
	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2532
			zap.Error(err))
D
dragondriver 已提交
2533

E
Enwei Jiao 已提交
2534
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2535

2536 2537
		resp.Status.Reason = err.Error()
		return resp, nil
2538 2539
	}

D
dragondriver 已提交
2540 2541 2542
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2543
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2544 2545 2546 2547

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2548
			zap.Error(err),
D
dragondriver 已提交
2549
			zap.Uint64("BeginTs", ft.BeginTs()),
2550
			zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2551

E
Enwei Jiao 已提交
2552
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2553

D
dragondriver 已提交
2554
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2555 2556
		resp.Status.Reason = err.Error()
		return resp, nil
2557 2558
	}

D
dragondriver 已提交
2559 2560 2561
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", ft.BeginTs()),
2562
		zap.Uint64("EndTs", ft.EndTs()))
D
dragondriver 已提交
2563

E
Enwei Jiao 已提交
2564 2565
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2566
	return ft.result, nil
2567 2568
}

2569
// Query get the records by primary keys.
C
Cai Yudong 已提交
2570
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2571
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2572
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2573 2574 2575

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2576 2577 2578 2579 2580
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2581

E
Enwei Jiao 已提交
2582 2583
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Query")
	defer sp.End()
2584
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2585

2586
	qt := &queryTask{
2587 2588 2589
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2590 2591
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2592
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2593
			),
E
Enwei Jiao 已提交
2594
			ReqID: paramtable.GetNodeID(),
2595
		},
2596 2597
		request:          request,
		qc:               node.queryCoord,
2598
		queryShardPolicy: mergeRoundRobinPolicy,
2599
		shardMgr:         node.shardMgr,
2600 2601
	}

D
dragondriver 已提交
2602 2603
	method := "Query"

E
Enwei Jiao 已提交
2604
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2605 2606
		metrics.TotalLabel).Inc()

2607
	log := log.Ctx(ctx).With(
2608
		zap.String("role", typeutil.ProxyRole),
2609 2610
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2611 2612 2613 2614
		zap.Strings("partitions", request.PartitionNames))

	log.Debug(
		rpcReceived(method),
2615 2616 2617 2618
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2619

D
dragondriver 已提交
2620
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
2621
		log.Warn(
D
dragondriver 已提交
2622
			rpcFailedToEnqueue(method),
2623
			zap.Error(err))
D
dragondriver 已提交
2624

E
Enwei Jiao 已提交
2625
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2626
			metrics.AbandonLabel).Inc()
2627

2628 2629 2630 2631 2632 2633
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2634
	}
Z
Zach 已提交
2635
	tr.CtxRecord(ctx, "query request enqueue")
2636

2637
	log.Debug(rpcEnqueued(method))
D
dragondriver 已提交
2638 2639

	if err := qt.WaitToFinish(); err != nil {
2640
		log.Warn(
D
dragondriver 已提交
2641
			rpcFailedToWaitToFinish(method),
2642
			zap.Error(err))
2643

E
Enwei Jiao 已提交
2644
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2645
			metrics.FailLabel).Inc()
2646

2647 2648 2649 2650 2651 2652 2653
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2654
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
2655
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2656
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
2657

2658
	log.Debug(rpcDone(method))
D
dragondriver 已提交
2659

E
Enwei Jiao 已提交
2660
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2661 2662
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
2663
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2664
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
2665
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2666
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2667 2668

	ret := &milvuspb.QueryResults{
2669 2670
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
2671 2672
	}
	sentSize := proto.Size(qt.result)
2673
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
2674
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2675
	return ret, nil
2676
}
2677

2678
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2679 2680 2681 2682
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2683

E
Enwei Jiao 已提交
2684 2685
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateAlias")
	defer sp.End()
D
dragondriver 已提交
2686

Y
Yusup 已提交
2687 2688 2689 2690 2691 2692 2693
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2694
	method := "CreateAlias"
2695
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2696
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2697

2698
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2699 2700 2701 2702 2703
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2704 2705
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2706 2707 2708
	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2709
			zap.Error(err))
D
dragondriver 已提交
2710

E
Enwei Jiao 已提交
2711
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2712

Y
Yusup 已提交
2713 2714 2715 2716 2717 2718
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2719 2720 2721
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2722
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2723 2724 2725 2726

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2727
			zap.Error(err),
D
dragondriver 已提交
2728
			zap.Uint64("BeginTs", cat.BeginTs()),
2729
			zap.Uint64("EndTs", cat.EndTs()))
E
Enwei Jiao 已提交
2730
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2731 2732 2733 2734 2735 2736 2737

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2738 2739 2740
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", cat.BeginTs()),
2741
		zap.Uint64("EndTs", cat.EndTs()))
D
dragondriver 已提交
2742

E
Enwei Jiao 已提交
2743 2744
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2745 2746 2747
	return cat.result, nil
}

2748
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2749 2750 2751 2752
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2753

E
Enwei Jiao 已提交
2754 2755
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropAlias")
	defer sp.End()
D
dragondriver 已提交
2756

Y
Yusup 已提交
2757 2758 2759 2760 2761 2762 2763
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2764
	method := "DropAlias"
2765
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2766
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2767

2768
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2769 2770 2771 2772
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2773 2774
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2775 2776 2777
	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2778
			zap.Error(err))
E
Enwei Jiao 已提交
2779
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2780

Y
Yusup 已提交
2781 2782 2783 2784 2785 2786
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2787 2788 2789
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2790
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2791 2792 2793 2794

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2795
			zap.Error(err),
D
dragondriver 已提交
2796
			zap.Uint64("BeginTs", dat.BeginTs()),
2797
			zap.Uint64("EndTs", dat.EndTs()))
Y
Yusup 已提交
2798

E
Enwei Jiao 已提交
2799
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2800

Y
Yusup 已提交
2801 2802 2803 2804 2805 2806
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2807 2808 2809
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", dat.BeginTs()),
2810
		zap.Uint64("EndTs", dat.EndTs()))
D
dragondriver 已提交
2811

E
Enwei Jiao 已提交
2812 2813
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2814 2815 2816
	return dat.result, nil
}

2817
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2818 2819 2820 2821
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2822

E
Enwei Jiao 已提交
2823 2824
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-AlterAlias")
	defer sp.End()
D
dragondriver 已提交
2825

Y
Yusup 已提交
2826 2827 2828 2829 2830 2831 2832
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2833
	method := "AlterAlias"
2834
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2835
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2836

2837
	log := log.Ctx(ctx).With(
D
dragondriver 已提交
2838 2839 2840 2841 2842
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2843 2844
	log.Debug(rpcReceived(method))

D
dragondriver 已提交
2845 2846 2847
	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
2848
			zap.Error(err))
E
Enwei Jiao 已提交
2849
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2850

Y
Yusup 已提交
2851 2852 2853 2854 2855 2856
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2857 2858 2859
	log.Debug(
		rpcEnqueued(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2860
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2861 2862 2863 2864

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2865
			zap.Error(err),
D
dragondriver 已提交
2866
			zap.Uint64("BeginTs", aat.BeginTs()),
2867
			zap.Uint64("EndTs", aat.EndTs()))
Y
Yusup 已提交
2868

E
Enwei Jiao 已提交
2869
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2870

Y
Yusup 已提交
2871 2872 2873 2874 2875 2876
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2877 2878 2879
	log.Debug(
		rpcDone(method),
		zap.Uint64("BeginTs", aat.BeginTs()),
2880
		zap.Uint64("EndTs", aat.EndTs()))
D
dragondriver 已提交
2881

E
Enwei Jiao 已提交
2882 2883
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2884 2885 2886
	return aat.result, nil
}

2887
// CalcDistance calculates the distances between vectors.
2888
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
2889 2890 2891 2892 2893
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
2894

E
Enwei Jiao 已提交
2895 2896
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CalcDistance")
	defer sp.End()
2897

2898 2899
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
2900

2901 2902 2903 2904 2905
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
2906 2907
		}

2908
		qt := &queryTask{
2909 2910 2911
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
2912 2913
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2914
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2915
				),
E
Enwei Jiao 已提交
2916
				ReqID: paramtable.GetNodeID(),
2917
			},
2918 2919 2920 2921
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

2922
			queryShardPolicy: mergeRoundRobinPolicy,
2923
			shardMgr:         node.shardMgr,
2924 2925
		}

2926
		log := log.Ctx(ctx).With(
G
groot 已提交
2927 2928
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
2929
			zap.Any("OutputFields", queryRequest.OutputFields))
G
groot 已提交
2930

2931
		err := node.sched.dqQueue.Enqueue(qt)
2932
		if err != nil {
2933 2934
			log.Error("CalcDistance queryTask failed to enqueue",
				zap.Error(err))
2935

2936 2937 2938 2939 2940
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2941
			}, err
2942
		}
2943

2944
		log.Debug("CalcDistance queryTask enqueued")
2945 2946 2947

		err = qt.WaitToFinish()
		if err != nil {
2948 2949
			log.Error("CalcDistance queryTask failed to WaitToFinish",
				zap.Error(err))
2950 2951 2952 2953 2954 2955

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
2956
			}, err
2957
		}
2958

2959
		log.Debug("CalcDistance queryTask Done")
2960 2961

		return &milvuspb.QueryResults{
2962 2963
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
2964 2965 2966
		}, nil
	}

G
groot 已提交
2967 2968
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
E
Enwei Jiao 已提交
2969
		traceID:   sp.SpanContext().TraceID().String(),
G
groot 已提交
2970
		queryFunc: query,
2971 2972
	}

G
groot 已提交
2973
	return task.Execute(ctx, request)
2974 2975
}

2976
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
2977
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
2978 2979
	panic("implement me")
}
X
XuanYang-cn 已提交
2980

2981
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
2982
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
E
Enwei Jiao 已提交
2983 2984
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetPersistentSegmentInfo")
	defer sp.End()
2985 2986 2987

	log := log.Ctx(ctx)

D
dragondriver 已提交
2988
	log.Debug("GetPersistentSegmentInfo",
2989
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2990 2991 2992
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
2993
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
2994
		Status: &commonpb.Status{
2995
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
2996 2997
		},
	}
2998 2999 3000 3001
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3002 3003
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3004
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3005
		metrics.TotalLabel).Inc()
3006 3007 3008

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
3009
	if err != nil {
E
Enwei Jiao 已提交
3010
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
3022
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3023
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3024 3025
		return resp, nil
	}
3026 3027

	// get Segment info
3028
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3029 3030 3031
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3032
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3033
		),
3034
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3035 3036
	})
	if err != nil {
E
Enwei Jiao 已提交
3037
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3038
			metrics.FailLabel).Inc()
3039 3040
		log.Warn("GetPersistentSegmentInfo fail",
			zap.Error(err))
3041
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3042 3043
		return resp, nil
	}
3044 3045 3046
	log.Debug("GetPersistentSegmentInfo",
		zap.Int("len(infos)", len(infoResp.Infos)),
		zap.Any("status", infoResp.Status))
3047
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3048
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3049
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3050 3051 3052 3053 3054 3055
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3056
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3057 3058
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3059
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3060 3061 3062
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3063
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3064
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3065
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3066
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3067 3068 3069 3070
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3071
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3072
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
E
Enwei Jiao 已提交
3073 3074
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetQuerySegmentInfo")
	defer sp.End()
3075 3076 3077

	log := log.Ctx(ctx)

D
dragondriver 已提交
3078
	log.Debug("GetQuerySegmentInfo",
3079
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3080 3081 3082
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3083
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3084
		Status: &commonpb.Status{
3085
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3086 3087
		},
	}
3088 3089 3090 3091
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3092

3093 3094
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3095
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3096 3097
		metrics.TotalLabel).Inc()

3098 3099
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3100
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3101 3102 3103
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3104
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3105 3106 3107
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3108
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3109
		),
3110
		CollectionID: collID,
Z
zhenshan.cao 已提交
3111 3112
	})
	if err != nil {
E
Enwei Jiao 已提交
3113
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3114 3115
		log.Error("Failed to get segment info from QueryCoord",
			zap.Error(err))
Z
zhenshan.cao 已提交
3116 3117 3118
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3119 3120 3121
	log.Debug("GetQuerySegmentInfo",
		zap.Any("infos", infoResp.Infos),
		zap.Any("status", infoResp.Status))
3122
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3123
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3124 3125
		log.Error("Failed to get segment info from QueryCoord",
			zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3139
			State:        info.SegmentState,
3140
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3141 3142
		}
	}
3143

E
Enwei Jiao 已提交
3144 3145
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3146
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3147 3148 3149 3150
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3151
// Dummy handles dummy request
C
Cai Yudong 已提交
3152
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3153 3154 3155 3156 3157 3158
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
3159

E
Enwei Jiao 已提交
3160 3161
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Dummy")
	defer sp.End()
3162 3163 3164

	log := log.Ctx(ctx)

3165
	if err != nil {
3166 3167
		log.Warn("Failed to parse dummy request type",
			zap.Error(err))
3168 3169 3170
		return failedResponse, nil
	}

3171 3172
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3173
		if err != nil {
3174 3175
			log.Warn("Failed to parse dummy query request",
				zap.Error(err))
3176 3177 3178
			return failedResponse, nil
		}

3179
		request := &milvuspb.QueryRequest{
3180 3181 3182
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3183
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3184 3185
		}

3186
		_, err = node.Query(ctx, request)
3187
		if err != nil {
3188 3189
			log.Warn("Failed to execute dummy query",
				zap.Error(err))
3190 3191
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3192 3193 3194 3195 3196 3197

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3198 3199
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3200 3201
}

J
jingkl 已提交
3202
// RegisterLink registers a link
C
Cai Yudong 已提交
3203
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3204
	code := node.stateCode.Load().(commonpb.StateCode)
3205

E
Enwei Jiao 已提交
3206 3207
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RegisterLink")
	defer sp.End()
3208 3209

	log := log.Ctx(ctx).With(
3210
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3211
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3212

3213 3214
	log.Debug("RegisterLink")

3215
	if code != commonpb.StateCode_Healthy {
3216 3217 3218
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3219
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3220
				Reason:    "proxy not healthy",
3221 3222 3223
			},
		}, nil
	}
E
Enwei Jiao 已提交
3224
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3225 3226 3227
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3228
			ErrorCode: commonpb.ErrorCode_Success,
3229
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3230 3231 3232
		},
	}, nil
}
3233

3234
// GetMetrics gets the metrics of proxy
3235 3236
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
E
Enwei Jiao 已提交
3237 3238
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetMetrics")
	defer sp.End()
3239 3240 3241

	log := log.Ctx(ctx)

3242 3243
	log.RatedDebug(60, "Proxy.GetMetrics",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3244 3245 3246 3247
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3248
			zap.Int64("nodeID", paramtable.GetNodeID()),
3249
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3250
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3251 3252 3253 3254

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3255
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3256 3257 3258 3259 3260 3261 3262 3263
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3264
			zap.Int64("nodeID", paramtable.GetNodeID()),
3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

3277 3278 3279
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3280
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3281
	)
3282
	if metricType == metricsinfo.SystemInfoMetrics {
3283 3284 3285
		metrics, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err != nil {
			metrics, err = getSystemInfoMetrics(ctx, req, node)
3286
		}
3287

3288 3289
		log.RatedDebug(60, "Proxy.GetMetrics",
			zap.Int64("nodeID", paramtable.GetNodeID()),
3290
			zap.String("req", req.Request),
3291
			zap.String("metricType", metricType),
3292 3293 3294
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3295 3296
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3297
		return metrics, nil
3298 3299
	}

3300 3301
	log.RatedWarn(60, "Proxy.GetMetrics failed, request metric type is not implemented yet",
		zap.Int64("nodeID", paramtable.GetNodeID()),
3302
		zap.String("req", req.Request),
3303
		zap.String("metricType", metricType))
3304 3305 3306 3307 3308 3309 3310 3311 3312 3313

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3314 3315 3316
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
E
Enwei Jiao 已提交
3317 3318
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetProxyMetrics")
	defer sp.End()
3319 3320

	log := log.Ctx(ctx).With(
3321
		zap.Int64("nodeID", paramtable.GetNodeID()),
3322 3323
		zap.String("req", req.Request))

3324 3325
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3326
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3327 3328 3329 3330

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3331
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3349 3350 3351
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3352
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3353
	)
3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3369 3370
		//log.Debug("Proxy.GetProxyMetrics",
		//	zap.String("metricType", metricType))
3371 3372 3373 3374

		return proxyMetrics, nil
	}

J
Jiquan Long 已提交
3375
	log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
3376
		zap.String("metricType", metricType))
3377 3378 3379 3380 3381 3382 3383 3384 3385

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3386 3387
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3388 3389
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-LoadBalance")
	defer sp.End()
3390 3391 3392

	log := log.Ctx(ctx)

B
bigsheeper 已提交
3393
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3394
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3395 3396 3397 3398 3399 3400 3401 3402 3403
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3404 3405 3406

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
J
Jiquan Long 已提交
3407
		log.Warn("failed to get collection id",
3408 3409
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
3410 3411 3412
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3413
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3414 3415 3416
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
3417
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3418
		),
B
bigsheeper 已提交
3419 3420
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3421
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3422
		SealedSegmentIDs: req.SealedSegmentIDs,
3423
		CollectionID:     collectionID,
B
bigsheeper 已提交
3424 3425
	})
	if err != nil {
J
Jiquan Long 已提交
3426
		log.Warn("Failed to LoadBalance from Query Coordinator",
3427 3428
			zap.Any("req", req),
			zap.Error(err))
B
bigsheeper 已提交
3429 3430 3431 3432
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
J
Jiquan Long 已提交
3433
		log.Warn("Failed to LoadBalance from Query Coordinator",
3434
			zap.String("errMsg", infoResp.Reason))
B
bigsheeper 已提交
3435 3436 3437
		status.Reason = infoResp.Reason
		return status, nil
	}
3438 3439 3440
	log.Debug("LoadBalance Done",
		zap.Any("req", req),
		zap.Any("status", infoResp))
B
bigsheeper 已提交
3441 3442 3443 3444
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3445 3446
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
E
Enwei Jiao 已提交
3447 3448
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetReplicas")
	defer sp.End()
3449 3450 3451 3452 3453 3454

	log := log.Ctx(ctx)

	log.Debug("received get replicas request",
		zap.Int64("collection", req.GetCollectionID()),
		zap.Bool("with shard nodes", req.GetWithShardNodes()))
3455 3456 3457 3458 3459 3460
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3461 3462
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3463
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3464
	)
3465

W
wei liu 已提交
3466 3467 3468 3469
	if req.GetCollectionName() != "" {
		req.CollectionID, _ = globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	}

3470 3471
	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
3472 3473
		log.Error("Failed to get replicas from Query Coordinator",
			zap.Error(err))
3474 3475 3476 3477
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3478 3479 3480
	log.Debug("received get replicas response",
		zap.Any("resp", resp),
		zap.Error(err))
3481 3482 3483
	return resp, nil
}

3484
// GetCompactionState gets the compaction state of multiple segments
3485
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
E
Enwei Jiao 已提交
3486 3487
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCompactionState")
	defer sp.End()
3488 3489 3490 3491 3492

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionState request")
3493 3494 3495 3496 3497 3498 3499
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
3500 3501 3502
	log.Debug("received GetCompactionState response",
		zap.Any("resp", resp),
		zap.Error(err))
3503 3504 3505
	return resp, err
}

3506
// ManualCompaction invokes compaction on specified collection
3507
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
E
Enwei Jiao 已提交
3508 3509
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ManualCompaction")
	defer sp.End()
3510 3511 3512 3513 3514

	log := log.Ctx(ctx).With(
		zap.Int64("collectionID", req.GetCollectionID()))

	log.Info("received ManualCompaction request")
3515 3516 3517 3518 3519 3520 3521
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
3522 3523 3524
	log.Info("received ManualCompaction response",
		zap.Any("resp", resp),
		zap.Error(err))
3525 3526 3527
	return resp, err
}

3528
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3529
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
E
Enwei Jiao 已提交
3530 3531
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetCompactionStateWithPlans")
	defer sp.End()
3532 3533 3534 3535 3536

	log := log.Ctx(ctx).With(
		zap.Int64("compactionID", req.GetCompactionID()))

	log.Debug("received GetCompactionStateWithPlans request")
3537 3538 3539 3540 3541 3542 3543
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
3544 3545 3546
	log.Debug("received GetCompactionStateWithPlans response",
		zap.Any("resp", resp),
		zap.Error(err))
3547 3548 3549
	return resp, err
}

B
Bingyi Sun 已提交
3550 3551
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
E
Enwei Jiao 已提交
3552 3553
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetFlushState")
	defer sp.End()
3554 3555 3556 3557 3558

	log := log.Ctx(ctx)

	log.Debug("received get flush state request",
		zap.Any("request", req))
3559
	var err error
B
Bingyi Sun 已提交
3560 3561 3562
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
J
Jiquan Long 已提交
3563
		log.Warn("unable to get flush state because of closed server")
B
Bingyi Sun 已提交
3564 3565 3566
		return resp, nil
	}

3567
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3568
	if err != nil {
3569 3570
		log.Warn("failed to get flush state response",
			zap.Error(err))
X
Xiaofan 已提交
3571 3572
		return nil, err
	}
3573 3574
	log.Debug("received get flush state response",
		zap.Any("response", resp))
B
Bingyi Sun 已提交
3575 3576 3577
	return resp, err
}

C
Cai Yudong 已提交
3578 3579
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3580 3581
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3582 3583
}

3584 3585 3586
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3587 3588
}

3589
// unhealthyStatus returns the proxy not healthy status
3590 3591 3592
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3593
		Reason:    "proxy not healthy",
3594 3595
	}
}
G
groot 已提交
3596 3597 3598

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
E
Enwei Jiao 已提交
3599 3600
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Import")
	defer sp.End()
3601 3602 3603

	log := log.Ctx(ctx)

3604 3605
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3606 3607
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3608 3609 3610 3611 3612 3613
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3614 3615 3616 3617
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3618

3619 3620
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
3621 3622
		log.Error("failed to execute import request",
			zap.Error(err))
3623 3624 3625 3626 3627
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3628 3629
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3630
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3631 3632
		metrics.TotalLabel).Inc()

3633
	// Call rootCoord to finish import.
3634 3635
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
3636
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3637 3638
		log.Error("failed to execute bulk insert request",
			zap.Error(err))
3639 3640 3641 3642
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3643

E
Enwei Jiao 已提交
3644 3645
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3646
	return respFromRC, nil
G
groot 已提交
3647 3648
}

3649
// GetImportState checks import task state from RootCoord.
G
groot 已提交
3650
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
E
Enwei Jiao 已提交
3651 3652
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-GetImportState")
	defer sp.End()
3653 3654 3655 3656 3657

	log := log.Ctx(ctx)

	log.Debug("received get import state request",
		zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3658 3659 3660 3661 3662
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3663 3664
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3665
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3666
		metrics.TotalLabel).Inc()
G
groot 已提交
3667 3668

	resp, err := node.rootCoord.GetImportState(ctx, req)
3669
	if err != nil {
E
Enwei Jiao 已提交
3670
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3671 3672
		log.Error("failed to execute get import state",
			zap.Error(err))
3673 3674 3675 3676 3677
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

3678 3679 3680
	log.Debug("successfully received get import state response",
		zap.Int64("taskID", req.GetTask()),
		zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
3681 3682
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3683
	return resp, nil
G
groot 已提交
3684 3685 3686 3687
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
E
Enwei Jiao 已提交
3688 3689
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListImportTasks")
	defer sp.End()
3690 3691 3692

	log := log.Ctx(ctx)

J
Jiquan Long 已提交
3693
	log.Debug("received list import tasks request")
G
groot 已提交
3694 3695 3696 3697 3698
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3699 3700
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3701
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3702
		metrics.TotalLabel).Inc()
G
groot 已提交
3703
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
3704
	if err != nil {
E
Enwei Jiao 已提交
3705
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3706 3707
		log.Error("failed to execute list import tasks",
			zap.Error(err))
3708 3709
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
3710 3711 3712
		return resp, nil
	}

3713 3714 3715
	log.Debug("successfully received list import tasks response",
		zap.String("collection", req.CollectionName),
		zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
3716 3717
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
3718 3719 3720
	return resp, err
}

3721 3722
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3723 3724
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-InvalidateCredentialCache")
	defer sp.End()
3725 3726

	log := log.Ctx(ctx).With(
3727 3728
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3729 3730

	log.Debug("received request to invalidate credential cache")
3731
	if !node.checkHealthy() {
3732
		return unhealthyStatus(), nil
3733
	}
3734 3735 3736 3737 3738

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
3739
	log.Debug("complete to invalidate credential cache")
3740 3741 3742 3743 3744 3745 3746 3747 3748

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3749 3750
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-UpdateCredentialCache")
	defer sp.End()
3751 3752

	log := log.Ctx(ctx).With(
3753 3754
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
3755 3756

	log.Debug("received request to update credential cache")
3757
	if !node.checkHealthy() {
3758
		return unhealthyStatus(), nil
3759
	}
3760 3761

	credInfo := &internalpb.CredentialInfo{
3762 3763
		Username:       request.Username,
		Sha256Password: request.Password,
3764 3765 3766 3767
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
3768
	log.Debug("complete to update credential cache")
3769 3770 3771 3772 3773 3774 3775 3776

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3777 3778
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateCredential")
	defer sp.End()
3779 3780 3781 3782 3783 3784

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("CreateCredential",
		zap.String("role", typeutil.ProxyRole))
3785
	if !node.checkHealthy() {
3786
		return unhealthyStatus(), nil
3787
	}
3788 3789 3790 3791 3792 3793 3794 3795 3796 3797
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
3798 3799
		log.Error("decode password fail",
			zap.Error(err))
3800 3801 3802 3803 3804 3805
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
3806 3807
		log.Error("illegal password",
			zap.Error(err))
3808 3809 3810 3811 3812 3813 3814
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
3815 3816
		log.Error("encrypt password fail",
			zap.Error(err))
3817 3818 3819 3820 3821
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
3822

3823 3824 3825
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
3826
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
3827 3828 3829
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
3830 3831
		log.Error("create credential fail",
			zap.Error(err))
3832 3833 3834 3835 3836 3837 3838 3839
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
3840
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3841 3842
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-UpdateCredential")
	defer sp.End()
3843 3844 3845 3846 3847 3848

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("UpdateCredential",
		zap.String("role", typeutil.ProxyRole))
3849
	if !node.checkHealthy() {
3850
		return unhealthyStatus(), nil
3851
	}
C
codeman 已提交
3852 3853
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
3854 3855
		log.Error("decode old password fail",
			zap.Error(err))
C
codeman 已提交
3856 3857 3858 3859 3860 3861
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
3862
	if err != nil {
3863 3864
		log.Error("decode password fail",
			zap.Error(err))
3865 3866 3867 3868 3869
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3870 3871
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
3872 3873
		log.Error("illegal password",
			zap.Error(err))
3874 3875 3876 3877 3878
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
3879 3880

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
3881 3882 3883 3884 3885 3886 3887
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
3888
	if err != nil {
3889 3890
		log.Error("encrypt password fail",
			zap.Error(err))
3891 3892 3893 3894 3895
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
3896
	updateCredReq := &internalpb.CredentialInfo{
3897
		Username:          req.Username,
3898
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
3899 3900
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
3901
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
3902
	if err != nil { // for error like conntext timeout etc.
3903 3904
		log.Error("update credential fail",
			zap.Error(err))
3905 3906 3907 3908 3909 3910 3911 3912 3913
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3914 3915
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DeleteCredential")
	defer sp.End()
3916 3917 3918 3919 3920 3921

	log := log.Ctx(ctx).With(
		zap.String("username", req.Username))

	log.Debug("DeleteCredential",
		zap.String("role", typeutil.ProxyRole))
3922
	if !node.checkHealthy() {
3923
		return unhealthyStatus(), nil
3924 3925
	}

3926 3927 3928 3929 3930 3931
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
3932 3933
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
3934 3935
		log.Error("delete credential fail",
			zap.Error(err))
3936 3937 3938 3939 3940 3941 3942 3943 3944
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
E
Enwei Jiao 已提交
3945 3946
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListCredUsers")
	defer sp.End()
3947 3948 3949 3950 3951

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole))

	log.Debug("ListCredUsers")
3952
	if !node.checkHealthy() {
3953
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
3954
	}
3955
	rootCoordReq := &milvuspb.ListCredUsersRequest{
3956 3957 3958
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
3959 3960
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
3973
		Usernames: resp.Usernames,
3974 3975
	}, nil
}
3976

3977
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
3978 3979
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateRole")
	defer sp.End()
3980 3981 3982 3983 3984

	log := log.Ctx(ctx)

	log.Debug("CreateRole",
		zap.Any("req", req))
3985
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
3986
		return errorutil.UnhealthyStatus(code), nil
3987 3988 3989 3990 3991 3992 3993 3994 3995 3996
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
3997
		}, nil
3998 3999 4000 4001
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
4002 4003
		log.Error("fail to create role",
			zap.Error(err))
4004 4005 4006
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4007
		}, nil
4008 4009
	}
	return result, nil
4010 4011
}

4012
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4013 4014
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropRole")
	defer sp.End()
4015 4016 4017 4018 4019

	log := log.Ctx(ctx)

	log.Debug("DropRole",
		zap.Any("req", req))
4020
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4021
		return errorutil.UnhealthyStatus(code), nil
4022 4023 4024 4025 4026
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4027
		}, nil
4028
	}
4029 4030 4031 4032 4033
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4034
		}, nil
4035
	}
4036 4037
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
4038 4039 4040
		log.Error("fail to drop role",
			zap.String("role_name", req.RoleName),
			zap.Error(err))
4041 4042 4043
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4044
		}, nil
4045 4046
	}
	return result, nil
4047 4048
}

4049
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4050 4051
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-OperateUserRole")
	defer sp.End()
4052 4053 4054 4055 4056

	log := log.Ctx(ctx)

	log.Debug("OperateUserRole",
		zap.Any("req", req))
4057
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4058
		return errorutil.UnhealthyStatus(code), nil
4059 4060 4061 4062 4063
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4064
		}, nil
4065 4066 4067 4068 4069
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4070
		}, nil
4071 4072 4073 4074
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
4075 4076
		logger.Error("fail to operate user role",
			zap.Error(err))
4077 4078 4079
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4080
		}, nil
4081 4082
	}
	return result, nil
4083 4084
}

4085
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
E
Enwei Jiao 已提交
4086 4087
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectRole")
	defer sp.End()
4088 4089 4090 4091

	log := log.Ctx(ctx)

	log.Debug("SelectRole", zap.Any("req", req))
4092
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4093
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4094 4095 4096 4097 4098 4099 4100 4101 4102
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4103
			}, nil
4104 4105 4106 4107 4108
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
4109 4110
		log.Error("fail to select role",
			zap.Error(err))
4111 4112 4113 4114 4115
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4116
		}, nil
4117 4118
	}
	return result, nil
4119 4120
}

4121
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
E
Enwei Jiao 已提交
4122 4123
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectUser")
	defer sp.End()
4124 4125 4126 4127 4128

	log := log.Ctx(ctx)

	log.Debug("SelectUser",
		zap.Any("req", req))
4129
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4130
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4131 4132 4133 4134 4135 4136 4137 4138 4139
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4140
			}, nil
4141 4142 4143 4144 4145
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
4146 4147
		log.Error("fail to select user",
			zap.Error(err))
4148 4149 4150 4151 4152
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4153
		}, nil
4154 4155
	}
	return result, nil
4156 4157
}

4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4188 4189
}

4190
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4191 4192
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-OperatePrivilege")
	defer sp.End()
4193 4194 4195 4196 4197

	log := log.Ctx(ctx)

	log.Debug("OperatePrivilege",
		zap.Any("req", req))
4198
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4199
		return errorutil.UnhealthyStatus(code), nil
4200 4201 4202 4203 4204
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4205
		}, nil
4206 4207 4208 4209 4210 4211
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4212
		}, nil
4213 4214 4215 4216
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
4217 4218
		log.Error("fail to operate privilege",
			zap.Error(err))
4219 4220 4221
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4222
		}, nil
4223 4224
	}
	return result, nil
4225 4226
}

4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
E
Enwei Jiao 已提交
4254 4255
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-SelectGrant")
	defer sp.End()
4256 4257 4258 4259 4260

	log := log.Ctx(ctx)

	log.Debug("SelectGrant",
		zap.Any("req", req))
4261
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4262
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4263 4264 4265 4266 4267 4268 4269 4270
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4271
		}, nil
4272 4273 4274 4275
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
4276 4277
		log.Error("fail to select grant",
			zap.Error(err))
4278 4279 4280 4281 4282
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4283
		}, nil
4284 4285 4286 4287 4288
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
E
Enwei Jiao 已提交
4289 4290
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RefreshPolicyInfoCache")
	defer sp.End()
4291 4292 4293 4294 4295

	log := log.Ctx(ctx)

	log.Debug("RefreshPrivilegeInfoCache",
		zap.Any("req", req))
4296 4297 4298 4299 4300 4301 4302 4303 4304 4305
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
4306 4307
			log.Error("fail to refresh policy info",
				zap.Error(err))
4308 4309 4310 4311 4312 4313
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
4314
	log.Debug("RefreshPrivilegeInfoCache success")
4315 4316 4317 4318

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4319
}
4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
4337
	node.multiRateLimiter.SetQuotaStates(request.GetStates(), request.GetCodes())
4338 4339 4340 4341 4342 4343 4344
	rateStrs := lo.FilterMap(request.GetRates(), func(r *internalpb.Rate, _ int) (string, bool) {
		return fmt.Sprintf("rateType:%s, rate:%s", r.GetRt().String(), ratelimitutil.Limit(r.GetR()).String()),
			ratelimitutil.Limit(r.GetR()) != ratelimitutil.Inf
	})
	if len(rateStrs) > 0 {
		log.RatedInfo(30, "current rates in proxy", zap.Int64("proxyNodeID", paramtable.GetNodeID()), zap.Strings("rates", rateStrs))
	}
4345 4346
	if len(request.GetStates()) != 0 {
		for i := range request.GetStates() {
4347
			log.Warn("Proxy set quota states", zap.String("state", request.GetStates()[i].String()), zap.String("reason", request.GetCodes()[i].String()))
4348 4349
		}
	}
4350 4351 4352
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4353 4354 4355 4356

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4357 4358 4359 4360
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4361 4362 4363 4364 4365 4366 4367 4368 4369 4370
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

E
Enwei Jiao 已提交
4371 4372
		ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RefreshPolicyInfoCache")
		defer sp.End()
4373 4374 4375

		log := log.Ctx(ctx).With(zap.String("role", role))

4376
		if err != nil {
4377 4378
			log.Warn("check health fail",
				zap.Error(err))
4379 4380 4381 4382 4383
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4384
			log.Warn("check health fail")
4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
4408 4409 4410
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_Success,
			},
4411 4412 4413 4414 4415
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4416
	states, reasons := node.multiRateLimiter.GetQuotaStates()
4417 4418 4419 4420 4421
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
4422 4423 4424
		QuotaStates: states,
		Reasons:     reasons,
		IsHealthy:   true,
4425
	}, nil
4426
}
W
wei liu 已提交
4427

J
jaime 已提交
4428
func (node *Proxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
4429
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RenameCollection")
J
jaime 已提交
4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448
	defer sp.End()

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
		zap.String("oldName", req.GetOldName()),
		zap.String("newName", req.GetNewName()))

	log.Info("received rename collection request")
	var err error

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	if err := validateCollectionName(req.GetNewName()); err != nil {
		log.Warn("validate new collection name fail", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalCollectionName,
			Reason:    err.Error(),
J
jaime 已提交
4449
		}, nil
J
jaime 已提交
4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468
	}

	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_RenameCollection),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
	)
	resp, err := node.rootCoord.RenameCollection(ctx, req)
	if err != nil {
		log.Warn("failed to rename collection", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, err
	}

	return resp, nil
}

W
wei liu 已提交
4469
func (node *Proxy) CreateResourceGroup(ctx context.Context, request *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4470 4471 4472 4473
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4474 4475 4476 4477 4478 4479 4480 4481
	method := "CreateResourceGroup"
	if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil {
		log.Warn("CreateResourceGroup failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-CreateResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &CreateResourceGroupTask{
		ctx:                        ctx,
		Condition:                  NewTaskCondition(ctx),
		CreateResourceGroupRequest: request,
		queryCoord:                 node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("CreateResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("CreateResourceGroup failed to enqueue",
			zap.Error(err))
W
wei liu 已提交
4503
		return getErrResponse(err, method), nil
W
wei liu 已提交
4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514
	}

	log.Debug("CreateResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("CreateResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4515
		return getErrResponse(err, method), nil
W
wei liu 已提交
4516 4517 4518 4519 4520 4521 4522 4523 4524 4525
	}

	log.Debug("CreateResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4526 4527
}

W
wei liu 已提交
4528 4529 4530 4531
func getErrResponse(err error, method string) *commonpb.Status {
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()

	return &commonpb.Status{
W
wei liu 已提交
4532
		ErrorCode: commonpb.ErrorCode_IllegalArgument,
W
wei liu 已提交
4533 4534 4535 4536
		Reason:    err.Error(),
	}
}

W
wei liu 已提交
4537
func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4538 4539 4540 4541
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4542
	method := "DropResourceGroup"
W
wei liu 已提交
4543 4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &DropResourceGroupTask{
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		DropResourceGroupRequest: request,
		queryCoord:               node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("DropResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("DropResourceGroup failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4565
		return getErrResponse(err, method), nil
W
wei liu 已提交
4566 4567 4568 4569 4570 4571 4572 4573 4574 4575 4576
	}

	log.Debug("DropResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("DropResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4577
		return getErrResponse(err, method), nil
W
wei liu 已提交
4578 4579 4580 4581 4582 4583 4584 4585 4586 4587
	}

	log.Debug("DropResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4588 4589 4590
}

func (node *Proxy) TransferNode(ctx context.Context, request *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4591 4592 4593 4594
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

W
wei liu 已提交
4595 4596 4597 4598 4599 4600 4601 4602 4603 4604 4605 4606 4607 4608 4609
	method := "TransferNode"
	if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
		log.Warn("TransferNode failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

	if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
		log.Warn("TransferNode failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4610 4611 4612 4613 4614 4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629 4630 4631
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferNode")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &TransferNodeTask{
		ctx:                 ctx,
		Condition:           NewTaskCondition(ctx),
		TransferNodeRequest: request,
		queryCoord:          node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("TransferNode received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("TransferNode failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4632
		return getErrResponse(err, method), nil
W
wei liu 已提交
4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643
	}

	log.Debug("TransferNode enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("TransferNode failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4644
		return getErrResponse(err, method), nil
W
wei liu 已提交
4645 4646 4647 4648 4649 4650 4651 4652 4653 4654
	}

	log.Debug("TransferNode done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4655 4656 4657
}

func (node *Proxy) TransferReplica(ctx context.Context, request *milvuspb.TransferReplicaRequest) (*commonpb.Status, error) {
W
wei liu 已提交
4658 4659 4660
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
W
wei liu 已提交
4661

W
wei liu 已提交
4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675 4676
	method := "TransferReplica"
	if err := ValidateResourceGroupName(request.GetSourceResourceGroup()); err != nil {
		log.Warn("TransferReplica failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

	if err := ValidateResourceGroupName(request.GetTargetResourceGroup()); err != nil {
		log.Warn("TransferReplica failed",
			zap.Error(err),
		)
		return getErrResponse(err, method), nil
	}

W
wei liu 已提交
4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-TransferReplica")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &TransferReplicaTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		TransferReplicaRequest: request,
		queryCoord:             node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("TransferReplica received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("TransferReplica failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4699
		return getErrResponse(err, method), nil
W
wei liu 已提交
4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710
	}

	log.Debug("TransferReplica enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("TransferReplica failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4711
		return getErrResponse(err, method), nil
W
wei liu 已提交
4712 4713 4714 4715 4716 4717 4718 4719 4720 4721
	}

	log.Debug("TransferReplica done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4722 4723
}

W
wei liu 已提交
4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790
func (node *Proxy) ListResourceGroups(ctx context.Context, request *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.ListResourceGroupsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-ListResourceGroups")
	defer sp.End()
	method := "ListResourceGroups"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &ListResourceGroupsTask{
		ctx:                       ctx,
		Condition:                 NewTaskCondition(ctx),
		ListResourceGroupsRequest: request,
		queryCoord:                node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("ListResourceGroups received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("ListResourceGroups failed to enqueue",
			zap.Error(err))

		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
		return &milvuspb.ListResourceGroupsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("ListResourceGroups enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("ListResourceGroups failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
		return &milvuspb.ListResourceGroupsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug("ListResourceGroups done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4791 4792 4793
}

func (node *Proxy) DescribeResourceGroup(ctx context.Context, request *milvuspb.DescribeResourceGroupRequest) (*milvuspb.DescribeResourceGroupResponse, error) {
W
wei liu 已提交
4794 4795 4796 4797 4798 4799
	if !node.checkHealthy() {
		return &milvuspb.DescribeResourceGroupResponse{
			Status: unhealthyStatus(),
		}, nil
	}

W
wei liu 已提交
4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811
	method := "DescribeResourceGroup"
	GetErrResponse := func(err error) *milvuspb.DescribeResourceGroupResponse {
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()

		return &milvuspb.DescribeResourceGroupResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}

W
wei liu 已提交
4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DescribeResourceGroup")
	defer sp.End()
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
	t := &DescribeResourceGroupTask{
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		DescribeResourceGroupRequest: request,
		queryCoord:                   node.queryCoord,
	}

	log := log.Ctx(ctx).With(
		zap.String("role", typeutil.ProxyRole),
	)

	log.Debug("DescribeResourceGroup received")

	if err := node.sched.ddQueue.Enqueue(t); err != nil {
		log.Warn("DescribeResourceGroup failed to enqueue",
			zap.Error(err))

W
wei liu 已提交
4834
		return GetErrResponse(err), nil
W
wei liu 已提交
4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845
	}

	log.Debug("DescribeResourceGroup enqueued",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	if err := t.WaitToFinish(); err != nil {
		log.Warn("DescribeResourceGroup failed to WaitToFinish",
			zap.Error(err),
			zap.Uint64("BeginTS", t.BeginTs()),
			zap.Uint64("EndTS", t.EndTs()))
W
wei liu 已提交
4846
		return GetErrResponse(err), nil
W
wei liu 已提交
4847 4848 4849 4850 4851 4852 4853 4854 4855 4856
	}

	log.Debug("DescribeResourceGroup done",
		zap.Uint64("BeginTS", t.BeginTs()),
		zap.Uint64("EndTS", t.EndTs()))

	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
	return t.result, nil
W
wei liu 已提交
4857
}