impl.go 159.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24
	"strconv"
25 26 27
	"sync"

	"golang.org/x/sync/errgroup"
28

29
	"github.com/golang/protobuf/proto"
S
SimFG 已提交
30 31
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
32
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/log"
34
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
35
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
36 37 38 39
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
40
	"github.com/milvus-io/milvus/internal/util"
41
	"github.com/milvus-io/milvus/internal/util/commonpbutil"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43
	"github.com/milvus-io/milvus/internal/util/errorutil"
44
	"github.com/milvus-io/milvus/internal/util/importutil"
45 46
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
E
Enwei Jiao 已提交
47
	"github.com/milvus-io/milvus/internal/util/paramtable"
48
	"github.com/milvus-io/milvus/internal/util/timerecord"
49
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
50
	"github.com/milvus-io/milvus/internal/util/typeutil"
51 52
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
53 54
)

55 56
const moduleName = "Proxy"

57
// UpdateStateCode updates the state code of Proxy.
58
func (node *Proxy) UpdateStateCode(code commonpb.StateCode) {
59
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
60 61
}

62
// GetComponentStates get state of Proxy.
63 64
func (node *Proxy) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
	stats := &milvuspb.ComponentStates{
G
godchen 已提交
65 66 67 68
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
69
	code, ok := node.stateCode.Load().(commonpb.StateCode)
G
godchen 已提交
70 71 72 73 74 75
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
76
		return stats, nil
G
godchen 已提交
77
	}
78 79 80 81
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
82
	info := &milvuspb.ComponentInfo{
83 84
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
85
		Role:      typeutil.ProxyRole,
G
godchen 已提交
86 87 88 89 90 91
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
92
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
93
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
94 95 96 97 98 99 100 101 102
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

103
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
104
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
105 106 107 108
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

109
	ctx = logutil.WithModule(ctx, moduleName)
110
	logutil.Logger(ctx).Info("received request to invalidate collection meta cache",
111
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
112
		zap.String("db", request.DbName),
113 114
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
115

116
	collectionName := request.CollectionName
117
	collectionID := request.CollectionID
X
Xiaofan 已提交
118 119

	var aliasName []string
N
neza2017 已提交
120
	if globalMetaCache != nil {
121 122 123 124
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
X
Xiaofan 已提交
125
			aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
126
		}
N
neza2017 已提交
127
	}
128 129
	if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection {
		// no need to handle error, since this Proxy may not create dml stream for the collection.
130 131
		node.chMgr.removeDMLStream(request.GetCollectionID())
		// clean up collection level metrics
E
Enwei Jiao 已提交
132
		metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), collectionName)
X
Xiaofan 已提交
133
		for _, alias := range aliasName {
E
Enwei Jiao 已提交
134
			metrics.CleanupCollectionMetrics(paramtable.GetNodeID(), alias)
X
Xiaofan 已提交
135
		}
136
	}
137
	logutil.Logger(ctx).Info("complete to invalidate collection meta cache",
138
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
139
		zap.String("db", request.DbName),
140 141
		zap.String("collection", collectionName),
		zap.Int64("collectionID", collectionID))
D
dragondriver 已提交
142

143
	return &commonpb.Status{
144
		ErrorCode: commonpb.ErrorCode_Success,
145 146
		Reason:    "",
	}, nil
147 148
}

149
// CreateCollection create a collection by the schema.
150
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
151
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
152 153 154
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
155 156 157 158

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
159 160 161
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
162
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
163

164
	cct := &createCollectionTask{
S
sunby 已提交
165
		ctx:                     ctx,
166 167
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
168
		rootCoord:               node.rootCoord,
169 170
	}

171 172 173
	// avoid data race
	lenOfSchema := len(request.Schema)

174 175
	log.Debug(
		rpcReceived(method),
176
		zap.String("traceID", traceID),
177
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
178 179
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
180
		zap.Int("len(schema)", lenOfSchema),
181 182
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
183

184 185 186
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
187 188
			zap.Error(err),
			zap.String("traceID", traceID),
189
			zap.String("role", typeutil.ProxyRole),
190 191 192
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Int("len(schema)", lenOfSchema),
193 194
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
195

E
Enwei Jiao 已提交
196
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
197
		return &commonpb.Status{
198
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
199 200 201 202
			Reason:    err.Error(),
		}, nil
	}

203 204
	log.Debug(
		rpcEnqueued(method),
205
		zap.String("traceID", traceID),
206
		zap.String("role", typeutil.ProxyRole),
207 208 209
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
210 211
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
212 213
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
214 215
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
216

217 218 219
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
220
			zap.Error(err),
221
			zap.String("traceID", traceID),
222
			zap.String("role", typeutil.ProxyRole),
223 224 225
			zap.Int64("MsgID", cct.ID()),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
226 227
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
228
			zap.Int("len(schema)", lenOfSchema),
229 230
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
D
dragondriver 已提交
231

E
Enwei Jiao 已提交
232
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
233
		return &commonpb.Status{
234
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
235 236 237 238
			Reason:    err.Error(),
		}, nil
	}

239 240
	log.Debug(
		rpcDone(method),
241
		zap.String("traceID", traceID),
242
		zap.String("role", typeutil.ProxyRole),
243 244 245 246 247 248
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
249 250
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
251

E
Enwei Jiao 已提交
252 253
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
254 255 256
	return cct.result, nil
}

257
// DropCollection drop a collection.
C
Cai Yudong 已提交
258
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
259 260 261
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
262 263 264 265

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
266 267
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
268
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
269

270
	dct := &dropCollectionTask{
S
sunby 已提交
271
		ctx:                   ctx,
272 273
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
274
		rootCoord:             node.rootCoord,
275
		chMgr:                 node.chMgr,
S
sunby 已提交
276
		chTicker:              node.chTicker,
277 278
	}

279 280
	log.Debug("DropCollection received",
		zap.String("traceID", traceID),
281
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
282 283
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
284 285 286 287 288

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
289
			zap.String("role", typeutil.ProxyRole),
290 291 292
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
293
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
294
		return &commonpb.Status{
295
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
296 297 298 299
			Reason:    err.Error(),
		}, nil
	}

300 301
	log.Debug("DropCollection enqueued",
		zap.String("traceID", traceID),
302
		zap.String("role", typeutil.ProxyRole),
303 304 305
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
306 307
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
308 309 310

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
311
			zap.Error(err),
312
			zap.String("traceID", traceID),
313
			zap.String("role", typeutil.ProxyRole),
314 315 316
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTs", dct.BeginTs()),
			zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
317 318 319
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
320
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
321
		return &commonpb.Status{
322
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
323 324 325 326
			Reason:    err.Error(),
		}, nil
	}

327 328
	log.Debug("DropCollection done",
		zap.String("traceID", traceID),
329
		zap.String("role", typeutil.ProxyRole),
330 331 332 333 334 335
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
336 337
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
338 339 340
	return dct.result, nil
}

341
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
342
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
343 344 345 346 347
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
348 349 350 351

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
352 353
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
354
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
355
		metrics.TotalLabel).Inc()
356 357 358

	log.Debug("HasCollection received",
		zap.String("traceID", traceID),
359
		zap.String("role", typeutil.ProxyRole),
360 361 362
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

363
	hct := &hasCollectionTask{
S
sunby 已提交
364
		ctx:                  ctx,
365 366
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
367
		rootCoord:            node.rootCoord,
368 369
	}

370 371 372 373
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
374
			zap.String("role", typeutil.ProxyRole),
375 376 377
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
378
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
379
			metrics.AbandonLabel).Inc()
380 381
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
382
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
383 384 385 386 387
				Reason:    err.Error(),
			},
		}, nil
	}

388 389
	log.Debug("HasCollection enqueued",
		zap.String("traceID", traceID),
390
		zap.String("role", typeutil.ProxyRole),
391 392 393
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
394 395
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
396 397 398

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
399
			zap.Error(err),
400
			zap.String("traceID", traceID),
401
			zap.String("role", typeutil.ProxyRole),
402 403 404
			zap.Int64("MsgID", hct.ID()),
			zap.Uint64("BeginTS", hct.BeginTs()),
			zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
405 406 407
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
408
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
409
			metrics.FailLabel).Inc()
410 411
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
412
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
413 414 415 416 417
				Reason:    err.Error(),
			},
		}, nil
	}

418 419
	log.Debug("HasCollection done",
		zap.String("traceID", traceID),
420
		zap.String("role", typeutil.ProxyRole),
421 422 423 424 425 426
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
427
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
428
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
429
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
430 431 432
	return hct.result, nil
}

433
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
434
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
435 436 437
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
438 439 440 441

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
442 443
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
444
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
445
		metrics.TotalLabel).Inc()
446
	lct := &loadCollectionTask{
S
sunby 已提交
447
		ctx:                   ctx,
448 449
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
450
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
451
		indexCoord:            node.indexCoord,
452 453
	}

454 455
	log.Debug("LoadCollection received",
		zap.String("traceID", traceID),
456
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
457 458
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
459 460 461 462 463

	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
464
			zap.String("role", typeutil.ProxyRole),
465 466 467
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
468
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
469
			metrics.AbandonLabel).Inc()
470
		return &commonpb.Status{
471
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
472 473 474
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
475

476 477
	log.Debug("LoadCollection enqueued",
		zap.String("traceID", traceID),
478
		zap.String("role", typeutil.ProxyRole),
479 480 481
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
482 483
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
484 485 486

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
487
			zap.Error(err),
488
			zap.String("traceID", traceID),
489
			zap.String("role", typeutil.ProxyRole),
490 491 492
			zap.Int64("MsgID", lct.ID()),
			zap.Uint64("BeginTS", lct.BeginTs()),
			zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
493 494
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))
E
Enwei Jiao 已提交
495
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
496
			metrics.FailLabel).Inc()
497
		return &commonpb.Status{
498
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
499 500 501 502
			Reason:    err.Error(),
		}, nil
	}

503 504
	log.Debug("LoadCollection done",
		zap.String("traceID", traceID),
505
		zap.String("role", typeutil.ProxyRole),
506 507 508 509 510 511
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
512
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
513
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
514
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
515
	return lct.result, nil
516 517
}

518
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
519
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
520 521 522
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
523

524
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
525 526
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
527 528
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
529
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
530
		metrics.TotalLabel).Inc()
531
	rct := &releaseCollectionTask{
S
sunby 已提交
532
		ctx:                      ctx,
533 534
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
535
		queryCoord:               node.queryCoord,
536
		chMgr:                    node.chMgr,
537 538
	}

539 540
	log.Debug(
		rpcReceived(method),
541
		zap.String("traceID", traceID),
542
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
543 544
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
545 546

	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
547 548
		log.Warn(
			rpcFailedToEnqueue(method),
549 550
			zap.Error(err),
			zap.String("traceID", traceID),
551
			zap.String("role", typeutil.ProxyRole),
552 553 554
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
555
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
556
			metrics.AbandonLabel).Inc()
557
		return &commonpb.Status{
558
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
559 560 561 562
			Reason:    err.Error(),
		}, nil
	}

563 564
	log.Debug(
		rpcEnqueued(method),
565
		zap.String("traceID", traceID),
566
		zap.String("role", typeutil.ProxyRole),
567 568 569
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
570 571
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
572 573

	if err := rct.WaitToFinish(); err != nil {
574 575
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
576
			zap.Error(err),
577
			zap.String("traceID", traceID),
578
			zap.String("role", typeutil.ProxyRole),
579 580 581
			zap.Int64("MsgID", rct.ID()),
			zap.Uint64("BeginTS", rct.BeginTs()),
			zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
582 583 584
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
585
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
586
			metrics.FailLabel).Inc()
587
		return &commonpb.Status{
588
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
589 590 591 592
			Reason:    err.Error(),
		}, nil
	}

593 594
	log.Debug(
		rpcDone(method),
595
		zap.String("traceID", traceID),
596
		zap.String("role", typeutil.ProxyRole),
597 598 599 600 601 602
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
603
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
604
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
605
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
606
	return rct.result, nil
607 608
}

609
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
610
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
611 612 613 614 615
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
616

617
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
618 619
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
620 621
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
622
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
623
		metrics.TotalLabel).Inc()
624

625
	dct := &describeCollectionTask{
S
sunby 已提交
626
		ctx:                       ctx,
627 628
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
629
		rootCoord:                 node.rootCoord,
630 631
	}

632 633
	log.Debug("DescribeCollection received",
		zap.String("traceID", traceID),
634
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
635 636
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
637 638 639 640 641

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
642
			zap.String("role", typeutil.ProxyRole),
643 644 645
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
646
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
647
			metrics.AbandonLabel).Inc()
648 649
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
650
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
651 652 653 654 655
				Reason:    err.Error(),
			},
		}, nil
	}

656 657
	log.Debug("DescribeCollection enqueued",
		zap.String("traceID", traceID),
658
		zap.String("role", typeutil.ProxyRole),
659 660 661
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
662 663
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
664 665 666

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
667
			zap.Error(err),
668
			zap.String("traceID", traceID),
669
			zap.String("role", typeutil.ProxyRole),
670 671 672
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTS", dct.BeginTs()),
			zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
673 674 675
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
676
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
677
			metrics.FailLabel).Inc()
678

679 680
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
681
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
682 683 684 685 686
				Reason:    err.Error(),
			},
		}, nil
	}

687 688
	log.Debug("DescribeCollection done",
		zap.String("traceID", traceID),
689
		zap.String("role", typeutil.ProxyRole),
690 691 692 693 694 695
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
696
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
697
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
698
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
699 700 701
	return dct.result, nil
}

702 703 704 705 706 707 708 709 710
// GetStatistics get the statistics, such as `num_rows`.
// WARNING: It is an experimental API
func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStatisticsRequest) (*milvuspb.GetStatisticsResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}

711
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetStatistics")
712 713 714 715
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	method := "GetStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
716
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
717
		metrics.TotalLabel).Inc()
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
	g := &getStatisticsTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
		ctx:       ctx,
		tr:        tr,
		dc:        node.dataCoord,
		qc:        node.queryCoord,
		shardMgr:  node.shardMgr,
	}

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Strings("partitions", request.PartitionNames))

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
746
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780
			metrics.AbandonLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Strings("partitions", request.PartitionNames))

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Strings("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
781
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
			metrics.FailLabel).Inc()

		return &milvuspb.GetStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
802
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
803
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
804
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
805 806 807
	return g.result, nil
}

808
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
809
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
810 811 812 813 814
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
815 816 817 818

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
819 820
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
821
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
822
		metrics.TotalLabel).Inc()
823
	g := &getCollectionStatisticsTask{
G
godchen 已提交
824 825 826
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
827
		dataCoord:                      node.dataCoord,
828 829
	}

830 831
	log.Debug(
		rpcReceived(method),
832
		zap.String("traceID", traceID),
833
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
834 835
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
836 837

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
838 839
		log.Warn(
			rpcFailedToEnqueue(method),
840 841
			zap.Error(err),
			zap.String("traceID", traceID),
842
			zap.String("role", typeutil.ProxyRole),
843 844 845
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
846
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
847
			metrics.AbandonLabel).Inc()
848

G
godchen 已提交
849
		return &milvuspb.GetCollectionStatisticsResponse{
850
			Status: &commonpb.Status{
851
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
852 853 854 855 856
				Reason:    err.Error(),
			},
		}, nil
	}

857 858
	log.Debug(
		rpcEnqueued(method),
859
		zap.String("traceID", traceID),
860
		zap.String("role", typeutil.ProxyRole),
861
		zap.Int64("msgID", g.ID()),
862 863
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
864 865
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
866 867

	if err := g.WaitToFinish(); err != nil {
868 869
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
870
			zap.Error(err),
871
			zap.String("traceID", traceID),
872
			zap.String("role", typeutil.ProxyRole),
873 874 875
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
876 877 878
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
879
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
880
			metrics.FailLabel).Inc()
881

G
godchen 已提交
882
		return &milvuspb.GetCollectionStatisticsResponse{
883
			Status: &commonpb.Status{
884
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
885 886 887 888 889
				Reason:    err.Error(),
			},
		}, nil
	}

890 891
	log.Debug(
		rpcDone(method),
892
		zap.String("traceID", traceID),
893
		zap.String("role", typeutil.ProxyRole),
894
		zap.Int64("msgID", g.ID()),
895 896 897 898 899
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
900
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
901
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
902
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
903
	return g.result, nil
904 905
}

906
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
907
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
908 909 910 911 912
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
913 914
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
915
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
916

917
	sct := &showCollectionsTask{
G
godchen 已提交
918 919 920
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
921
		queryCoord:             node.queryCoord,
922
		rootCoord:              node.rootCoord,
923 924
	}

925
	log.Debug("ShowCollections received",
926
		zap.String("role", typeutil.ProxyRole),
927 928 929 930 931 932
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
		zap.Any("CollectionNames", request.CollectionNames),
	)

933
	err := node.sched.ddQueue.Enqueue(sct)
934
	if err != nil {
935 936
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
937
			zap.String("role", typeutil.ProxyRole),
938 939 940 941 942 943
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

E
Enwei Jiao 已提交
944
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
945
		return &milvuspb.ShowCollectionsResponse{
946
			Status: &commonpb.Status{
947
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
948 949 950 951 952
				Reason:    err.Error(),
			},
		}, nil
	}

953
	log.Debug("ShowCollections enqueued",
954
		zap.String("role", typeutil.ProxyRole),
955
		zap.Int64("MsgID", sct.ID()),
956
		zap.String("DbName", sct.ShowCollectionsRequest.DbName),
957
		zap.Uint64("TimeStamp", request.TimeStamp),
958 959 960
		zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
		zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
	)
D
dragondriver 已提交
961

962 963
	err = sct.WaitToFinish()
	if err != nil {
964 965
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
966
			zap.String("role", typeutil.ProxyRole),
967 968 969 970 971 972 973
			zap.Int64("MsgID", sct.ID()),
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

E
Enwei Jiao 已提交
974
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
975

G
godchen 已提交
976
		return &milvuspb.ShowCollectionsResponse{
977
			Status: &commonpb.Status{
978
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
979 980 981 982 983
				Reason:    err.Error(),
			},
		}, nil
	}

984
	log.Debug("ShowCollections Done",
985
		zap.String("role", typeutil.ProxyRole),
986 987 988 989
		zap.Int64("MsgID", sct.ID()),
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
990 991
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
992

E
Enwei Jiao 已提交
993 994
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
995 996 997
	return sct.result, nil
}

J
jaime 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	method := "AlterCollection"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
1009
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
J
jaime 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033

	act := &alterCollectionTask{
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		AlterCollectionRequest: request,
		rootCoord:              node.rootCoord,
	}

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(act); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
1034
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
J
jaime 已提交
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", act.ID()),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

	if err := act.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.Int64("MsgID", act.ID()),
			zap.Uint64("BeginTs", act.BeginTs()),
			zap.Uint64("EndTs", act.EndTs()),
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
1064
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
J
jaime 已提交
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", act.ID()),
		zap.Uint64("BeginTs", act.BeginTs()),
		zap.Uint64("EndTs", act.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
1081 1082
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
J
jaime 已提交
1083 1084 1085
	return act.result, nil
}

1086
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
1087
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
1088 1089 1090
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1091

1092
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
1093 1094
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1095 1096
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1097
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1098

1099
	cpt := &createPartitionTask{
S
sunby 已提交
1100
		ctx:                    ctx,
1101 1102
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
1103
		rootCoord:              node.rootCoord,
1104 1105 1106
		result:                 nil,
	}

1107 1108 1109
	log.Debug(
		rpcReceived("CreatePartition"),
		zap.String("traceID", traceID),
1110
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1111 1112 1113
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1114 1115 1116 1117 1118 1119

	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
			zap.Error(err),
			zap.String("traceID", traceID),
1120
			zap.String("role", typeutil.ProxyRole),
1121 1122 1123 1124
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1125
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
1126

1127
		return &commonpb.Status{
1128
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1129 1130 1131
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1132

1133 1134 1135
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.String("traceID", traceID),
1136
		zap.String("role", typeutil.ProxyRole),
1137 1138 1139
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
1140 1141 1142
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1143 1144 1145 1146

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
1147
			zap.Error(err),
1148
			zap.String("traceID", traceID),
1149
			zap.String("role", typeutil.ProxyRole),
1150 1151 1152
			zap.Int64("MsgID", cpt.ID()),
			zap.Uint64("BeginTS", cpt.BeginTs()),
			zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
1153 1154 1155 1156
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1157
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1158

1159
		return &commonpb.Status{
1160
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1161 1162 1163
			Reason:    err.Error(),
		}, nil
	}
1164 1165 1166 1167

	log.Debug(
		rpcDone("CreatePartition"),
		zap.String("traceID", traceID),
1168
		zap.String("role", typeutil.ProxyRole),
1169 1170 1171 1172 1173 1174 1175
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1176 1177
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1178 1179 1180
	return cpt.result, nil
}

1181
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
1182
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
1183 1184 1185
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1186

1187
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
1188 1189
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1190 1191
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1192
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1193

1194
	dpt := &dropPartitionTask{
S
sunby 已提交
1195
		ctx:                  ctx,
1196 1197
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
1198
		rootCoord:            node.rootCoord,
C
cai.zhang 已提交
1199
		queryCoord:           node.queryCoord,
1200 1201 1202
		result:               nil,
	}

1203 1204 1205
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1206
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1207 1208 1209
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1210 1211 1212 1213 1214 1215

	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1216
			zap.String("role", typeutil.ProxyRole),
1217 1218 1219 1220
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1221
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
1222

1223
		return &commonpb.Status{
1224
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1225 1226 1227
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1228

1229 1230 1231
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1232
		zap.String("role", typeutil.ProxyRole),
1233 1234 1235
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1236 1237 1238
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1239 1240 1241 1242

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1243
			zap.Error(err),
1244
			zap.String("traceID", traceID),
1245
			zap.String("role", typeutil.ProxyRole),
1246 1247 1248
			zap.Int64("MsgID", dpt.ID()),
			zap.Uint64("BeginTS", dpt.BeginTs()),
			zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1249 1250 1251 1252
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1253
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1254

1255
		return &commonpb.Status{
1256
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1257 1258 1259
			Reason:    err.Error(),
		}, nil
	}
1260 1261 1262 1263

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1264
		zap.String("role", typeutil.ProxyRole),
1265 1266 1267 1268 1269 1270 1271
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1272 1273
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1274 1275 1276
	return dpt.result, nil
}

1277
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1278
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1279 1280 1281 1282 1283
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1284

D
dragondriver 已提交
1285
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1286 1287
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1288 1289 1290
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1291
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1292
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1293

1294
	hpt := &hasPartitionTask{
S
sunby 已提交
1295
		ctx:                 ctx,
1296 1297
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1298
		rootCoord:           node.rootCoord,
1299 1300 1301
		result:              nil,
	}

D
dragondriver 已提交
1302 1303 1304
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1305
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1306 1307 1308
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1309 1310 1311 1312 1313 1314

	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1315
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1316 1317 1318 1319
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1320
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1321
			metrics.AbandonLabel).Inc()
1322

1323 1324
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1325
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1326 1327 1328 1329 1330
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1331

D
dragondriver 已提交
1332 1333 1334
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1335
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1336 1337 1338
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1339 1340 1341
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1342 1343 1344 1345

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1346
			zap.Error(err),
D
dragondriver 已提交
1347
			zap.String("traceID", traceID),
1348
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1349 1350 1351
			zap.Int64("MsgID", hpt.ID()),
			zap.Uint64("BeginTS", hpt.BeginTs()),
			zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1352 1353 1354 1355
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1356
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1357
			metrics.FailLabel).Inc()
1358

1359 1360
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1361
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1362 1363 1364 1365 1366
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1367 1368 1369 1370

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1371
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1372 1373 1374 1375 1376 1377 1378
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1379
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1380
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1381
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1382 1383 1384
	return hpt.result, nil
}

1385
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1386
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1387 1388 1389
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1390

D
dragondriver 已提交
1391
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1392 1393
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1394 1395
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1396
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1397
		metrics.TotalLabel).Inc()
1398
	lpt := &loadPartitionsTask{
G
godchen 已提交
1399 1400 1401
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1402
		queryCoord:            node.queryCoord,
C
cai.zhang 已提交
1403
		indexCoord:            node.indexCoord,
1404 1405
	}

1406 1407 1408
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1409
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1410 1411 1412
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1413 1414 1415 1416 1417 1418

	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1419
			zap.String("role", typeutil.ProxyRole),
1420 1421 1422 1423
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1424
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1425
			metrics.AbandonLabel).Inc()
1426

1427
		return &commonpb.Status{
1428
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1429 1430 1431 1432
			Reason:    err.Error(),
		}, nil
	}

1433 1434 1435
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1436
		zap.String("role", typeutil.ProxyRole),
1437 1438 1439
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1440 1441 1442
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1443 1444 1445 1446

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1447
			zap.Error(err),
1448
			zap.String("traceID", traceID),
1449
			zap.String("role", typeutil.ProxyRole),
1450 1451 1452
			zap.Int64("MsgID", lpt.ID()),
			zap.Uint64("BeginTS", lpt.BeginTs()),
			zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1453 1454 1455 1456
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1457
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1458
			metrics.FailLabel).Inc()
1459

1460
		return &commonpb.Status{
1461
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1462 1463 1464 1465
			Reason:    err.Error(),
		}, nil
	}

1466 1467 1468
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1469
		zap.String("role", typeutil.ProxyRole),
1470 1471 1472 1473 1474 1475 1476
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1477
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1478
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1479
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1480
	return lpt.result, nil
1481 1482
}

1483
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1484
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1485 1486 1487
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1488 1489 1490 1491 1492

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1493
	rpt := &releasePartitionsTask{
G
godchen 已提交
1494 1495 1496
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1497
		queryCoord:               node.queryCoord,
1498 1499
	}

1500
	method := "ReleasePartitions"
1501
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1502
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1503
		metrics.TotalLabel).Inc()
1504 1505 1506
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1507
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1508 1509 1510
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1511 1512 1513 1514 1515 1516

	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1517
			zap.String("role", typeutil.ProxyRole),
1518 1519 1520 1521
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1522
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1523
			metrics.AbandonLabel).Inc()
1524

1525
		return &commonpb.Status{
1526
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1527 1528 1529 1530
			Reason:    err.Error(),
		}, nil
	}

1531 1532 1533
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1534
		zap.String("role", typeutil.ProxyRole),
1535 1536 1537
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1538 1539 1540
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1541 1542 1543 1544

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1545
			zap.Error(err),
1546
			zap.String("traceID", traceID),
1547
			zap.String("role", typeutil.ProxyRole),
1548 1549 1550
			zap.Int64("msgID", rpt.Base.MsgID),
			zap.Uint64("BeginTS", rpt.BeginTs()),
			zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1551 1552 1553 1554
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1555
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1556
			metrics.FailLabel).Inc()
1557

1558
		return &commonpb.Status{
1559
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1560 1561 1562 1563
			Reason:    err.Error(),
		}, nil
	}

1564 1565 1566
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1567
		zap.String("role", typeutil.ProxyRole),
1568 1569 1570 1571 1572 1573 1574
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

E
Enwei Jiao 已提交
1575
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1576
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1577
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1578
	return rpt.result, nil
1579 1580
}

1581
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1582
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1583 1584 1585 1586 1587
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1588 1589 1590 1591

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1592 1593
	method := "GetPartitionStatistics"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1594
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1595
		metrics.TotalLabel).Inc()
1596

1597
	g := &getPartitionStatisticsTask{
1598 1599 1600
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1601
		dataCoord:                     node.dataCoord,
1602 1603
	}

1604 1605 1606
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1607
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1608 1609 1610
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1611 1612 1613 1614 1615 1616

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1617
			zap.String("role", typeutil.ProxyRole),
1618 1619 1620 1621
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1622
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1623
			metrics.AbandonLabel).Inc()
1624

1625 1626 1627 1628 1629 1630 1631 1632
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1633 1634 1635
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1636
		zap.String("role", typeutil.ProxyRole),
1637 1638 1639
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
1640 1641 1642
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1643 1644 1645 1646

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1647
			zap.Error(err),
1648
			zap.String("traceID", traceID),
1649
			zap.String("role", typeutil.ProxyRole),
1650 1651 1652
			zap.Int64("msgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
1653 1654 1655 1656
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1657
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1658
			metrics.FailLabel).Inc()
1659

1660 1661 1662 1663 1664 1665 1666 1667
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1668 1669 1670
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1671
		zap.String("role", typeutil.ProxyRole),
1672 1673 1674 1675 1676 1677 1678
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

E
Enwei Jiao 已提交
1679
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1680
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1681
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1682
	return g.result, nil
1683 1684
}

1685
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1686
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1687 1688 1689 1690 1691
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1692 1693 1694 1695 1696

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1697
	spt := &showPartitionsTask{
G
godchen 已提交
1698 1699 1700
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1701
		rootCoord:             node.rootCoord,
1702
		queryCoord:            node.queryCoord,
G
godchen 已提交
1703
		result:                nil,
1704 1705
	}

1706
	method := "ShowPartitions"
1707 1708
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
E
Enwei Jiao 已提交
1709
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1710
		metrics.TotalLabel).Inc()
1711 1712 1713 1714

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1715
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1716
		zap.Any("request", request))
1717 1718 1719 1720 1721 1722

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1723
			zap.String("role", typeutil.ProxyRole),
1724 1725
			zap.Any("request", request))

E
Enwei Jiao 已提交
1726
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1727
			metrics.AbandonLabel).Inc()
1728

G
godchen 已提交
1729
		return &milvuspb.ShowPartitionsResponse{
1730
			Status: &commonpb.Status{
1731
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1732 1733 1734 1735 1736
				Reason:    err.Error(),
			},
		}, nil
	}

1737 1738 1739
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1740
		zap.String("role", typeutil.ProxyRole),
1741 1742 1743
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1744 1745
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1746 1747 1748 1749 1750
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1751
			zap.Error(err),
1752
			zap.String("traceID", traceID),
1753
			zap.String("role", typeutil.ProxyRole),
1754 1755 1756 1757 1758 1759
			zap.Int64("msgID", spt.ID()),
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1760

E
Enwei Jiao 已提交
1761
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1762
			metrics.FailLabel).Inc()
1763

G
godchen 已提交
1764
		return &milvuspb.ShowPartitionsResponse{
1765
			Status: &commonpb.Status{
1766
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1767 1768 1769 1770
				Reason:    err.Error(),
			},
		}, nil
	}
1771 1772 1773 1774

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1775
		zap.String("role", typeutil.ProxyRole),
1776 1777 1778 1779 1780 1781 1782
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

E
Enwei Jiao 已提交
1783
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1784
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
1785
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1786 1787 1788
	return spt.result, nil
}

S
SimFG 已提交
1789 1790
func (node *Proxy) getCollectionProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest, collectionID int64) (int64, error) {
	resp, err := node.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
S
smellthemoon 已提交
1791
		Base: commonpbutil.UpdateMsgBase(
1792 1793 1794
			request.Base,
			commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
		),
S
SimFG 已提交
1795 1796 1797 1798 1799
		CollectionIDs: []int64{collectionID},
	})
	if err != nil {
		return 0, err
	}
X
xige-16 已提交
1800 1801 1802 1803 1804

	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
		return 0, errors.New(resp.Status.Reason)
	}

S
SimFG 已提交
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
	if len(resp.InMemoryPercentages) == 0 {
		return 0, errors.New("fail to show collections from the querycoord, no data")
	}
	return resp.InMemoryPercentages[0], nil
}

func (node *Proxy) getPartitionProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest, collectionID int64) (int64, error) {
	IDs2Names := make(map[int64]string)
	partitionIDs := make([]int64, 0)
	for _, partitionName := range request.PartitionNames {
		partitionID, err := globalMetaCache.GetPartitionID(ctx, request.CollectionName, partitionName)
		if err != nil {
			return 0, err
		}
		IDs2Names[partitionID] = partitionName
		partitionIDs = append(partitionIDs, partitionID)
	}
	resp, err := node.queryCoord.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
S
smellthemoon 已提交
1823
		Base: commonpbutil.UpdateMsgBase(
1824 1825 1826
			request.Base,
			commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
		),
S
SimFG 已提交
1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849
		CollectionID: collectionID,
		PartitionIDs: partitionIDs,
	})
	if err != nil {
		return 0, err
	}
	if len(resp.InMemoryPercentages) != len(partitionIDs) {
		return 0, errors.New("fail to show partitions from the querycoord, invalid data num")
	}
	var progress int64
	for _, p := range resp.InMemoryPercentages {
		progress += p
	}
	progress /= int64(len(partitionIDs))
	return progress, nil
}

func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest) (*milvuspb.GetLoadingProgressResponse, error) {
	if !node.checkHealthy() {
		return &milvuspb.GetLoadingProgressResponse{Status: unhealthyStatus()}, nil
	}
	method := "GetLoadingProgress"
	tr := timerecord.NewTimeRecorder(method)
1850
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetLoadingProgress")
S
SimFG 已提交
1851 1852
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
E
Enwei Jiao 已提交
1853
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
S
SimFG 已提交
1854 1855 1856 1857 1858 1859 1860 1861
	logger.Info(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.Any("request", request))

	getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse {
		logger.Error("fail to get loading progress", zap.String("collection_name", request.CollectionName),
			zap.Strings("partition_name", request.PartitionNames), zap.Error(err))
E
Enwei Jiao 已提交
1862
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
S
SimFG 已提交
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876
		return &milvuspb.GetLoadingProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
	}
	if err := validateCollectionName(request.CollectionName); err != nil {
		return getErrResponse(err), nil
	}
	collectionID, err := globalMetaCache.GetCollectionID(ctx, request.CollectionName)
	if err != nil {
		return getErrResponse(err), nil
	}
1877 1878 1879 1880
	msgBase := commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
1881
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
1882
	)
S
SimFG 已提交
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
	if request.Base == nil {
		request.Base = msgBase
	} else {
		request.Base.MsgID = msgBase.MsgID
		request.Base.Timestamp = msgBase.Timestamp
		request.Base.SourceID = msgBase.SourceID
	}

	var progress int64
	if len(request.GetPartitionNames()) == 0 {
		if progress, err = node.getCollectionProgress(ctx, request, collectionID); err != nil {
			return getErrResponse(err), nil
		}
	} else {
		if progress, err = node.getPartitionProgress(ctx, request, collectionID); err != nil {
			return getErrResponse(err), nil
		}
	}

	logger.Info(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.Any("request", request))
E
Enwei Jiao 已提交
1906 1907
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
S
SimFG 已提交
1908 1909 1910 1911 1912 1913 1914 1915
	return &milvuspb.GetLoadingProgressResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Progress: progress,
	}, nil
}

1916
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1917
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1918 1919 1920
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1921

1922
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateIndex")
D
dragondriver 已提交
1923 1924 1925
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1926
	cit := &createIndexTask{
Z
zhenshan.cao 已提交
1927 1928 1929 1930 1931
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		req:        request,
		rootCoord:  node.rootCoord,
		indexCoord: node.indexCoord,
1932
		queryCoord: node.queryCoord,
1933 1934
	}

D
dragondriver 已提交
1935
	method := "CreateIndex"
1936
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
1937
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1938
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1939 1940 1941
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1942
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1943 1944 1945 1946
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1947 1948 1949 1950 1951 1952

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1953
			zap.String("role", typeutil.ProxyRole),
Z
zhenshan.cao 已提交
1954 1955 1956 1957
			zap.String("db", request.GetDbName()),
			zap.String("collection", request.GetCollectionName()),
			zap.String("field", request.GetFieldName()),
			zap.Any("extra_params", request.GetExtraParams()))
D
dragondriver 已提交
1958

E
Enwei Jiao 已提交
1959
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1960
			metrics.AbandonLabel).Inc()
1961

1962
		return &commonpb.Status{
1963
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1964 1965 1966 1967
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1968 1969 1970
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1971
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1972 1973 1974
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1975 1976 1977 1978
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1979 1980 1981 1982

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1983
			zap.Error(err),
D
dragondriver 已提交
1984
			zap.String("traceID", traceID),
1985
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1986 1987 1988
			zap.Int64("MsgID", cit.ID()),
			zap.Uint64("BeginTs", cit.BeginTs()),
			zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1989 1990 1991 1992 1993
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

E
Enwei Jiao 已提交
1994
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
1995
			metrics.FailLabel).Inc()
1996

1997
		return &commonpb.Status{
1998
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1999 2000 2001 2002
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2003 2004 2005
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2006
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2007 2008 2009 2010 2011 2012 2013 2014
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))

E
Enwei Jiao 已提交
2015
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2016
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2017
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2018 2019 2020
	return cit.result, nil
}

2021
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
2022
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
2023 2024 2025 2026 2027
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2028 2029 2030 2031 2032

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2033
	dit := &describeIndexTask{
S
sunby 已提交
2034
		ctx:                  ctx,
2035 2036
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
2037
		indexCoord:           node.indexCoord,
2038 2039
	}

2040 2041 2042
	method := "DescribeIndex"
	// avoid data race
	indexName := request.IndexName
2043
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2044
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2045
		metrics.TotalLabel).Inc()
2046 2047 2048
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2049
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2050 2051 2052
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
2053 2054 2055 2056 2057 2058 2059
		zap.String("index name", indexName))

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2060
			zap.String("role", typeutil.ProxyRole),
2061 2062 2063 2064 2065
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", indexName))

E
Enwei Jiao 已提交
2066
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2067
			metrics.AbandonLabel).Inc()
2068

2069 2070
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
2071
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2072 2073 2074 2075 2076
				Reason:    err.Error(),
			},
		}, nil
	}

2077 2078 2079
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2080
		zap.String("role", typeutil.ProxyRole),
2081 2082 2083
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
2084 2085 2086
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
2087 2088 2089 2090 2091
		zap.String("index name", indexName))

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2092
			zap.Error(err),
2093
			zap.String("traceID", traceID),
2094
			zap.String("role", typeutil.ProxyRole),
2095 2096 2097
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
2098 2099 2100
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
2101
			zap.String("index name", indexName))
D
dragondriver 已提交
2102

Z
zhenshan.cao 已提交
2103 2104 2105 2106
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
E
Enwei Jiao 已提交
2107
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2108
			metrics.FailLabel).Inc()
2109

2110 2111
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
2112
				ErrorCode: errCode,
2113 2114 2115 2116 2117
				Reason:    err.Error(),
			},
		}, nil
	}

2118 2119 2120
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2121
		zap.String("role", typeutil.ProxyRole),
2122 2123 2124 2125 2126 2127 2128 2129
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", indexName))

E
Enwei Jiao 已提交
2130
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2131
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2132
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2133 2134 2135
	return dit.result, nil
}

2136
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
2137
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
2138 2139 2140
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2141 2142 2143 2144 2145

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2146
	dit := &dropIndexTask{
S
sunby 已提交
2147
		ctx:              ctx,
B
BossZou 已提交
2148 2149
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
2150
		indexCoord:       node.indexCoord,
2151
		queryCoord:       node.queryCoord,
B
BossZou 已提交
2152
	}
G
godchen 已提交
2153

D
dragondriver 已提交
2154
	method := "DropIndex"
2155
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2156
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2157
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2158 2159 2160 2161

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2162
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2163 2164 2165 2166 2167
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

D
dragondriver 已提交
2168 2169 2170 2171 2172
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2173
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2174 2175 2176 2177
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
E
Enwei Jiao 已提交
2178
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2179
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2180

B
BossZou 已提交
2181
		return &commonpb.Status{
2182
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
2183 2184 2185
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
2186

D
dragondriver 已提交
2187 2188 2189
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2190
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2191 2192 2193
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
2194 2195 2196 2197
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
D
dragondriver 已提交
2198 2199 2200 2201

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2202
			zap.Error(err),
D
dragondriver 已提交
2203
			zap.String("traceID", traceID),
2204
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2205 2206 2207
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
2208 2209 2210 2211 2212
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

E
Enwei Jiao 已提交
2213
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2214
			metrics.FailLabel).Inc()
2215

B
BossZou 已提交
2216
		return &commonpb.Status{
2217
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
2218 2219 2220
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
2221 2222 2223 2224

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2225
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2226 2227 2228 2229 2230 2231 2232 2233
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

E
Enwei Jiao 已提交
2234
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2235
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2236
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
2237 2238 2239
	return dit.result, nil
}

2240 2241
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
2242
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
2243
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
2244 2245 2246 2247 2248
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2249 2250 2251 2252 2253

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2254
	gibpt := &getIndexBuildProgressTask{
2255 2256 2257
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
2258 2259
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
2260
		dataCoord:                    node.dataCoord,
2261 2262
	}

2263
	method := "GetIndexBuildProgress"
2264
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2265
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2266
		metrics.TotalLabel).Inc()
2267 2268 2269
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2270
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2271 2272 2273 2274
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2275 2276 2277 2278 2279 2280

	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2281
			zap.String("role", typeutil.ProxyRole),
2282 2283 2284 2285
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
E
Enwei Jiao 已提交
2286
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2287
			metrics.AbandonLabel).Inc()
2288

2289 2290 2291 2292 2293 2294 2295 2296
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2297 2298 2299
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2300
		zap.String("role", typeutil.ProxyRole),
2301 2302 2303
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
2304 2305 2306 2307
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2308 2309 2310 2311

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
2312
			zap.Error(err),
2313
			zap.String("traceID", traceID),
2314
			zap.String("role", typeutil.ProxyRole),
2315 2316 2317
			zap.Int64("MsgID", gibpt.ID()),
			zap.Uint64("BeginTs", gibpt.BeginTs()),
			zap.Uint64("EndTs", gibpt.EndTs()),
2318 2319 2320 2321
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
E
Enwei Jiao 已提交
2322
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2323
			metrics.FailLabel).Inc()
2324 2325 2326 2327 2328 2329 2330 2331

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2332 2333 2334 2335

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2336
		zap.String("role", typeutil.ProxyRole),
2337 2338 2339 2340 2341 2342 2343 2344
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName),
		zap.Any("result", gibpt.result))
2345

E
Enwei Jiao 已提交
2346
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2347
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2348
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2349
	return gibpt.result, nil
2350 2351
}

2352
// GetIndexState get the build-state of index.
2353
// Deprecated: use DescribeIndex instead
C
Cai Yudong 已提交
2354
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
2355 2356 2357 2358 2359
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2360 2361 2362 2363 2364

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2365
	dipt := &getIndexStateTask{
G
godchen 已提交
2366 2367 2368
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
2369 2370
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
2371 2372
	}

2373
	method := "GetIndexState"
2374
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2375
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2376
		metrics.TotalLabel).Inc()
2377 2378 2379
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2380
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2381 2382 2383 2384
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2385 2386 2387 2388 2389 2390

	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2391
			zap.String("role", typeutil.ProxyRole),
2392 2393 2394 2395 2396
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

E
Enwei Jiao 已提交
2397
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2398
			metrics.AbandonLabel).Inc()
2399

G
godchen 已提交
2400
		return &milvuspb.GetIndexStateResponse{
2401
			Status: &commonpb.Status{
2402
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2403 2404 2405 2406 2407
				Reason:    err.Error(),
			},
		}, nil
	}

2408 2409 2410
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2411
		zap.String("role", typeutil.ProxyRole),
2412 2413 2414
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2415 2416 2417 2418
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2419 2420 2421 2422

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2423
			zap.Error(err),
2424
			zap.String("traceID", traceID),
2425
			zap.String("role", typeutil.ProxyRole),
2426 2427 2428
			zap.Int64("MsgID", dipt.ID()),
			zap.Uint64("BeginTs", dipt.BeginTs()),
			zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2429 2430 2431 2432
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
E
Enwei Jiao 已提交
2433
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2434
			metrics.FailLabel).Inc()
2435

G
godchen 已提交
2436
		return &milvuspb.GetIndexStateResponse{
2437
			Status: &commonpb.Status{
2438
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2439 2440 2441 2442 2443
				Reason:    err.Error(),
			},
		}, nil
	}

2444 2445 2446
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2447
		zap.String("role", typeutil.ProxyRole),
2448 2449 2450 2451 2452 2453 2454 2455
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

E
Enwei Jiao 已提交
2456
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2457
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2458
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2459 2460 2461
	return dipt.result, nil
}

2462
// Insert insert records into collection.
C
Cai Yudong 已提交
2463
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2464 2465 2466 2467 2468 2469
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))

2470 2471 2472 2473 2474
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2475 2476
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
2477
	receiveSize := proto.Size(request)
2478
	rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize))
E
Enwei Jiao 已提交
2479
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Add(float64(receiveSize))
D
dragondriver 已提交
2480

E
Enwei Jiao 已提交
2481
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
2482
	it := &insertTask{
2483 2484
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2485
		// req:       request,
2486 2487 2488 2489
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2490
			InsertRequest: internalpb.InsertRequest{
2491 2492 2493
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Insert),
					commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
2494
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
2495
				),
2496 2497
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2498 2499 2500
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2501
				// RowData: transfer column based request to this
2502 2503
			},
		},
2504
		idAllocator:   node.rowIDAllocator,
2505 2506 2507
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
2508
	}
2509 2510

	if len(it.PartitionName) <= 0 {
2511
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2512 2513
	}

X
Xiangyu Wang 已提交
2514
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2515
		numRows := request.NumRows
2516 2517 2518 2519
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2520

X
Xiangyu Wang 已提交
2521 2522 2523 2524 2525 2526 2527
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2528 2529
	}

X
Xiangyu Wang 已提交
2530
	log.Debug("Enqueue insert request in Proxy",
2531
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2532 2533 2534 2535 2536
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2537 2538
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))
D
dragondriver 已提交
2539

X
Xiangyu Wang 已提交
2540 2541
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Debug("Failed to enqueue insert task: " + err.Error())
E
Enwei Jiao 已提交
2542
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2543
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2544
		return constructFailedResponse(err), nil
2545
	}
D
dragondriver 已提交
2546

X
Xiangyu Wang 已提交
2547
	log.Debug("Detail of insert request in Proxy",
2548
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2549
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2550 2551 2552 2553 2554
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
X
Xiangyu Wang 已提交
2555 2556 2557 2558 2559
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))

	if err := it.WaitToFinish(); err != nil {
		log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
E
Enwei Jiao 已提交
2560
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2561
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2562 2563 2564 2565 2566
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2567
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2579
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2580

E
Enwei Jiao 已提交
2581
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2582
		metrics.SuccessLabel).Inc()
2583
	successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
E
Enwei Jiao 已提交
2584 2585 2586
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
2587 2588 2589
	return it.result, nil
}

2590
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2591
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2592 2593 2594
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2595 2596
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
2597

2598
	receiveSize := proto.Size(request)
2599
	rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
E
Enwei Jiao 已提交
2600
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Add(float64(receiveSize))
2601

G
groot 已提交
2602 2603 2604 2605 2606 2607
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2608 2609 2610
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

E
Enwei Jiao 已提交
2611
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2612
		metrics.TotalLabel).Inc()
2613
	dt := &deleteTask{
X
xige-16 已提交
2614 2615 2616
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
G
godchen 已提交
2617
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2618 2619 2620
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2621
			DeleteRequest: internalpb.DeleteRequest{
2622 2623 2624 2625
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Delete),
					commonpbutil.WithMsgID(0),
				),
X
xige-16 已提交
2626
				DbName:         request.DbName,
G
godchen 已提交
2627 2628 2629
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2630 2631 2632 2633
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2634 2635
	}

2636
	log.Debug("Enqueue delete request in Proxy",
2637
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2638 2639 2640 2641
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2642 2643 2644 2645

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
		log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
E
Enwei Jiao 已提交
2646
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2647
			metrics.AbandonLabel).Inc()
2648

G
groot 已提交
2649 2650 2651 2652 2653 2654 2655 2656
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2657
	log.Debug("Detail of delete request in Proxy",
2658
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2659 2660 2661 2662 2663
		zap.Int64("msgID", dt.Base.MsgID),
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2664 2665
		zap.String("expr", request.Expr),
		zap.String("traceID", traceID))
G
groot 已提交
2666

2667 2668
	if err := dt.WaitToFinish(); err != nil {
		log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
E
Enwei Jiao 已提交
2669
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2670
			metrics.FailLabel).Inc()
G
groot 已提交
2671 2672 2673 2674 2675 2676 2677 2678
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

E
Enwei Jiao 已提交
2679
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2680
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2681 2682
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
	metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2683 2684 2685
	return dt.result, nil
}

2686
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2687
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2688
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2689
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Add(float64(receiveSize))
2690 2691 2692

	rateCol.Add(internalpb.RateType_DQLSearch.String(), float64(request.GetNq()))

2693 2694 2695 2696 2697
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2698 2699
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2700
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2701
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2702

C
cai.zhang 已提交
2703 2704
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2705

2706
	qt := &searchTask{
S
sunby 已提交
2707
		ctx:       ctx,
2708
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2709
		SearchRequest: &internalpb.SearchRequest{
2710 2711
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Search),
E
Enwei Jiao 已提交
2712
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2713
			),
E
Enwei Jiao 已提交
2714
			ReqID: paramtable.GetNodeID(),
2715
		},
2716 2717 2718 2719
		request:  request,
		qc:       node.queryCoord,
		tr:       timerecord.NewTimeRecorder("search"),
		shardMgr: node.shardMgr,
2720 2721
	}

2722 2723 2724
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

Z
Zach 已提交
2725
	log.Ctx(ctx).Info(
2726
		rpcReceived(method),
2727
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2728 2729 2730 2731 2732
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2733 2734 2735 2736
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2737

2738
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
Z
Zach 已提交
2739
		log.Ctx(ctx).Warn(
2740
			rpcFailedToEnqueue(method),
D
dragondriver 已提交
2741
			zap.Error(err),
2742
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2743 2744 2745 2746 2747 2748
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
			zap.Any("OutputFields", request.OutputFields),
2749 2750 2751
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2752

E
Enwei Jiao 已提交
2753
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2754
			metrics.AbandonLabel).Inc()
2755

2756 2757
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2758
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2759 2760 2761 2762
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
2763
	tr.CtxRecord(ctx, "search request enqueue")
2764

Z
Zach 已提交
2765
	log.Ctx(ctx).Debug(
2766
		rpcEnqueued(method),
2767
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2768
		zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2769 2770 2771 2772 2773
		zap.Uint64("timestamp", qt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
2774
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2775 2776 2777 2778
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2779

2780
	if err := qt.WaitToFinish(); err != nil {
Z
Zach 已提交
2781
		log.Ctx(ctx).Warn(
2782
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2783
			zap.Error(err),
2784
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2785
			zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2786 2787 2788 2789
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
2790
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2791 2792 2793 2794
			zap.Any("OutputFields", request.OutputFields),
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
2795

E
Enwei Jiao 已提交
2796
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2797
			metrics.FailLabel).Inc()
2798

2799 2800
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2801
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2802 2803 2804 2805 2806
				Reason:    err.Error(),
			},
		}, nil
	}

Z
Zach 已提交
2807
	span := tr.CtxRecord(ctx, "wait search result")
E
Enwei Jiao 已提交
2808
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2809
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2810
	tr.CtxRecord(ctx, "wait search result")
Z
Zach 已提交
2811
	log.Ctx(ctx).Debug(
2812
		rpcDone(method),
2813
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2814 2815 2816 2817 2818 2819
		zap.Int64("msgID", qt.ID()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2820 2821 2822 2823
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2824

E
Enwei Jiao 已提交
2825
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2826
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
2827
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2828
	searchDur := tr.ElapseSpan().Milliseconds()
E
Enwei Jiao 已提交
2829
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2830
		metrics.SearchLabel).Observe(float64(searchDur))
E
Enwei Jiao 已提交
2831
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
2832
		metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur))
2833 2834
	if qt.result != nil {
		sentSize := proto.Size(qt.result)
E
Enwei Jiao 已提交
2835
		metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
2836
		rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
2837
	}
2838 2839 2840
	return qt.result, nil
}

2841
// Flush notify data nodes to persist the data of collection.
2842 2843 2844 2845 2846 2847 2848
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2849
	if !node.checkHealthy() {
2850 2851
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2852
	}
D
dragondriver 已提交
2853 2854 2855 2856 2857

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2858
	ft := &flushTask{
T
ThreadDao 已提交
2859 2860 2861
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2862
		dataCoord:    node.dataCoord,
2863 2864
	}

D
dragondriver 已提交
2865
	method := "Flush"
2866
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
2867
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2868 2869 2870 2871

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2872
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2873 2874
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2875 2876 2877 2878 2879 2880

	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2881
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2882 2883 2884
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

E
Enwei Jiao 已提交
2885
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2886

2887 2888
		resp.Status.Reason = err.Error()
		return resp, nil
2889 2890
	}

D
dragondriver 已提交
2891 2892 2893
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2894
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2895 2896 2897
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2898 2899
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2900 2901 2902 2903

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2904
			zap.Error(err),
D
dragondriver 已提交
2905
			zap.String("traceID", traceID),
2906
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2907 2908 2909
			zap.Int64("MsgID", ft.ID()),
			zap.Uint64("BeginTs", ft.BeginTs()),
			zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2910 2911 2912
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

E
Enwei Jiao 已提交
2913
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2914

D
dragondriver 已提交
2915
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2916 2917
		resp.Status.Reason = err.Error()
		return resp, nil
2918 2919
	}

D
dragondriver 已提交
2920 2921 2922
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2923
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2924 2925 2926 2927 2928 2929
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))

E
Enwei Jiao 已提交
2930 2931
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2932
	return ft.result, nil
2933 2934
}

2935
// Query get the records by primary keys.
C
Cai Yudong 已提交
2936
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2937
	receiveSize := proto.Size(request)
E
Enwei Jiao 已提交
2938
	metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Add(float64(receiveSize))
2939 2940 2941

	rateCol.Add(internalpb.RateType_DQLQuery.String(), 1)

2942 2943 2944 2945 2946
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2947

D
dragondriver 已提交
2948 2949
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
2950
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2951

2952
	qt := &queryTask{
2953 2954 2955
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
2956 2957
			Base: commonpbutil.NewMsgBase(
				commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
2958
				commonpbutil.WithSourceID(paramtable.GetNodeID()),
2959
			),
E
Enwei Jiao 已提交
2960
			ReqID: paramtable.GetNodeID(),
2961
		},
2962 2963
		request:          request,
		qc:               node.queryCoord,
2964
		queryShardPolicy: mergeRoundRobinPolicy,
2965
		shardMgr:         node.shardMgr,
2966 2967
	}

D
dragondriver 已提交
2968 2969
	method := "Query"

E
Enwei Jiao 已提交
2970
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2971 2972
		metrics.TotalLabel).Inc()

Z
Zach 已提交
2973
	log.Ctx(ctx).Info(
D
dragondriver 已提交
2974
		rpcReceived(method),
2975
		zap.String("role", typeutil.ProxyRole),
2976 2977
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
2978 2979 2980 2981 2982
		zap.Strings("partitions", request.PartitionNames),
		zap.String("expr", request.Expr),
		zap.Strings("OutputFields", request.OutputFields),
		zap.Uint64("travel_timestamp", request.TravelTimestamp),
		zap.Uint64("guarantee_timestamp", request.GuaranteeTimestamp))
G
godchen 已提交
2983

D
dragondriver 已提交
2984
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
Z
Zach 已提交
2985
		log.Ctx(ctx).Warn(
D
dragondriver 已提交
2986 2987 2988
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("role", typeutil.ProxyRole),
2989 2990 2991
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2992

E
Enwei Jiao 已提交
2993
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
2994
			metrics.AbandonLabel).Inc()
2995

2996 2997 2998 2999 3000 3001
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
3002
	}
Z
Zach 已提交
3003
	tr.CtxRecord(ctx, "query request enqueue")
3004

Z
Zach 已提交
3005
	log.Ctx(ctx).Debug(
D
dragondriver 已提交
3006
		rpcEnqueued(method),
3007
		zap.String("role", typeutil.ProxyRole),
3008
		zap.Int64("msgID", qt.ID()),
3009 3010
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
3011
		zap.Strings("partitions", request.PartitionNames))
D
dragondriver 已提交
3012 3013

	if err := qt.WaitToFinish(); err != nil {
Z
Zach 已提交
3014
		log.Ctx(ctx).Warn(
D
dragondriver 已提交
3015 3016
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
3017
			zap.String("role", typeutil.ProxyRole),
3018
			zap.Int64("msgID", qt.ID()),
3019 3020 3021
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
3022

E
Enwei Jiao 已提交
3023
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3024
			metrics.FailLabel).Inc()
3025

3026 3027 3028 3029 3030 3031 3032
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
Z
Zach 已提交
3033
	span := tr.CtxRecord(ctx, "wait query result")
E
Enwei Jiao 已提交
3034
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
3035
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
3036

Z
Zach 已提交
3037
	log.Ctx(ctx).Debug(
D
dragondriver 已提交
3038 3039
		rpcDone(method),
		zap.String("role", typeutil.ProxyRole),
3040
		zap.Int64("msgID", qt.ID()),
3041 3042 3043
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
3044

E
Enwei Jiao 已提交
3045
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3046 3047
		metrics.SuccessLabel).Inc()

E
Enwei Jiao 已提交
3048
	metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
3049
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
E
Enwei Jiao 已提交
3050
	metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10),
3051
		metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
3052 3053

	ret := &milvuspb.QueryResults{
3054 3055
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
3056 3057
	}
	sentSize := proto.Size(qt.result)
3058
	rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
E
Enwei Jiao 已提交
3059
	metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
3060
	return ret, nil
3061
}
3062

3063
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
3064 3065 3066 3067
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
3068 3069 3070 3071 3072

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
3073 3074 3075 3076 3077 3078 3079
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
3080
	method := "CreateAlias"
3081
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3082
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
3102
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
3103

Y
Yusup 已提交
3104 3105 3106 3107 3108 3109
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3110 3111 3112
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
3113
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3114 3115 3116 3117
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
3118 3119
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
3120 3121 3122 3123

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
3124
			zap.Error(err),
D
dragondriver 已提交
3125
			zap.String("traceID", traceID),
3126
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3127 3128 3129 3130
			zap.Int64("MsgID", cat.ID()),
			zap.Uint64("BeginTs", cat.BeginTs()),
			zap.Uint64("EndTs", cat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
3131 3132
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
E
Enwei Jiao 已提交
3133
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
3134 3135 3136 3137 3138 3139 3140

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
3152 3153
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3154 3155 3156
	return cat.result, nil
}

3157
// DropAlias alter the alias of collection.
Y
Yusup 已提交
3158 3159 3160 3161
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
3162 3163 3164 3165 3166

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
3167 3168 3169 3170 3171 3172 3173
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
3174
	method := "DropAlias"
3175
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3176
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias))
E
Enwei Jiao 已提交
3193
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
3194

Y
Yusup 已提交
3195 3196 3197 3198 3199 3200
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3201 3202 3203
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
3204
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3205 3206 3207 3208
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
3209
		zap.String("alias", request.Alias))
D
dragondriver 已提交
3210 3211 3212 3213

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
3214
			zap.Error(err),
D
dragondriver 已提交
3215
			zap.String("traceID", traceID),
3216
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3217 3218 3219 3220
			zap.Int64("MsgID", dat.ID()),
			zap.Uint64("BeginTs", dat.BeginTs()),
			zap.Uint64("EndTs", dat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
3221 3222
			zap.String("alias", request.Alias))

E
Enwei Jiao 已提交
3223
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3224

Y
Yusup 已提交
3225 3226 3227 3228 3229 3230
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3231 3232 3233 3234 3235 3236 3237 3238 3239 3240
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

E
Enwei Jiao 已提交
3241 3242
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3243 3244 3245
	return dat.result, nil
}

3246
// AlterAlias alter alias of collection.
Y
Yusup 已提交
3247 3248 3249 3250
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
3251 3252 3253 3254 3255

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
3256 3257 3258 3259 3260 3261 3262
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
3263
	method := "AlterAlias"
3264
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3265
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
E
Enwei Jiao 已提交
3284
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
3285

Y
Yusup 已提交
3286 3287 3288 3289 3290 3291
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3292 3293 3294
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
3295
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3296 3297 3298 3299
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
3300 3301
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
3302 3303 3304 3305

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
3306
			zap.Error(err),
D
dragondriver 已提交
3307
			zap.String("traceID", traceID),
3308
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3309 3310 3311 3312
			zap.Int64("MsgID", aat.ID()),
			zap.Uint64("BeginTs", aat.BeginTs()),
			zap.Uint64("EndTs", aat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
3313 3314 3315
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
3316
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3317

Y
Yusup 已提交
3318 3319 3320 3321 3322 3323
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

E
Enwei Jiao 已提交
3335 3336
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3337 3338 3339
	return aat.result, nil
}

3340
// CalcDistance calculates the distances between vectors.
3341
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
3342 3343 3344 3345 3346
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
3347

3348 3349 3350 3351
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

3352 3353
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
3354

3355 3356 3357 3358 3359
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
3360 3361
		}

3362
		qt := &queryTask{
3363 3364 3365
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
3366 3367
				Base: commonpbutil.NewMsgBase(
					commonpbutil.WithMsgType(commonpb.MsgType_Retrieve),
E
Enwei Jiao 已提交
3368
					commonpbutil.WithSourceID(paramtable.GetNodeID()),
3369
				),
E
Enwei Jiao 已提交
3370
				ReqID: paramtable.GetNodeID(),
3371
			},
3372 3373 3374 3375
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

3376
			queryShardPolicy: mergeRoundRobinPolicy,
3377
			shardMgr:         node.shardMgr,
3378 3379
		}

G
groot 已提交
3380 3381 3382 3383 3384 3385
		items := []zapcore.Field{
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields),
		}

3386
		err := node.sched.dqQueue.Enqueue(qt)
3387
		if err != nil {
G
groot 已提交
3388
			log.Error("CalcDistance queryTask failed to enqueue", append(items, zap.Error(err))...)
3389

3390 3391 3392 3393 3394
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3395
			}, err
3396
		}
3397

G
groot 已提交
3398
		log.Debug("CalcDistance queryTask enqueued", items...)
3399 3400 3401

		err = qt.WaitToFinish()
		if err != nil {
G
groot 已提交
3402
			log.Error("CalcDistance queryTask failed to WaitToFinish", append(items, zap.Error(err))...)
3403 3404 3405 3406 3407 3408

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3409
			}, err
3410
		}
3411

G
groot 已提交
3412
		log.Debug("CalcDistance queryTask Done", items...)
3413 3414

		return &milvuspb.QueryResults{
3415 3416
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
3417 3418 3419
		}, nil
	}

G
groot 已提交
3420 3421 3422 3423
	// calcDistanceTask is not a standard task, no need to enqueue
	task := &calcDistanceTask{
		traceID:   traceID,
		queryFunc: query,
3424 3425
	}

G
groot 已提交
3426
	return task.Execute(ctx, request)
3427 3428
}

3429
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
3430
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
3431 3432
	panic("implement me")
}
X
XuanYang-cn 已提交
3433

3434
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
3435
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
D
dragondriver 已提交
3436
	log.Debug("GetPersistentSegmentInfo",
3437
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3438 3439 3440
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3441
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
3442
		Status: &commonpb.Status{
3443
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
3444 3445
		},
	}
3446 3447 3448 3449
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3450 3451
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3452
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3453
		metrics.TotalLabel).Inc()
3454 3455 3456

	// list segments
	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
X
XuanYang-cn 已提交
3457
	if err != nil {
E
Enwei Jiao 已提交
3458
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469
		resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error()
		return resp, nil
	}

	getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{
		CollectionID: collectionID,
		// -1 means list all partition segemnts
		PartitionID: -1,
		States:      []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed},
	})
	if err != nil {
E
Enwei Jiao 已提交
3470
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3471
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3472 3473
		return resp, nil
	}
3474 3475

	// get Segment info
3476
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
3477 3478 3479 3480
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
			commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
3481
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3482
		),
3483
		SegmentIDs: getSegmentsByStatesResponse.Segments,
X
XuanYang-cn 已提交
3484 3485
	})
	if err != nil {
E
Enwei Jiao 已提交
3486
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3487 3488
			metrics.FailLabel).Inc()
		log.Warn("GetPersistentSegmentInfo fail", zap.Error(err))
3489
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3490 3491
		return resp, nil
	}
3492
	log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status))
3493
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3494
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3495
			metrics.FailLabel).Inc()
X
XuanYang-cn 已提交
3496 3497 3498 3499 3500 3501
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3502
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3503 3504
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3505
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3506 3507 3508
			State:        info.State,
		}
	}
E
Enwei Jiao 已提交
3509
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3510
		metrics.SuccessLabel).Inc()
E
Enwei Jiao 已提交
3511
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3512
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3513 3514 3515 3516
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3517
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3518
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
D
dragondriver 已提交
3519
	log.Debug("GetQuerySegmentInfo",
3520
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3521 3522 3523
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3524
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3525
		Status: &commonpb.Status{
3526
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3527 3528
		},
	}
3529 3530 3531 3532
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3533

3534 3535
	method := "GetQuerySegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3536
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3537 3538
		metrics.TotalLabel).Inc()

3539 3540
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
E
Enwei Jiao 已提交
3541
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3542 3543 3544
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3545
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
3546 3547 3548 3549
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
			commonpbutil.WithMsgID(0),
			commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
3550
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3551
		),
3552
		CollectionID: collID,
Z
zhenshan.cao 已提交
3553 3554
	})
	if err != nil {
E
Enwei Jiao 已提交
3555
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3556
		log.Error("Failed to get segment info from QueryCoord", zap.Error(err))
Z
zhenshan.cao 已提交
3557 3558 3559
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3560
	log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
3561
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
E
Enwei Jiao 已提交
3562
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3563
		log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3577
			State:        info.SegmentState,
3578
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3579 3580
		}
	}
3581

E
Enwei Jiao 已提交
3582 3583
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3584
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3585 3586 3587 3588
	resp.Infos = queryInfos
	return resp, nil
}

J
jingkl 已提交
3589
// Dummy handles dummy request
C
Cai Yudong 已提交
3590
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
	if err != nil {
		log.Debug("Failed to parse dummy request type")
		return failedResponse, nil
	}

3602 3603
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3604
		if err != nil {
3605
			log.Debug("Failed to parse dummy query request")
3606 3607 3608
			return failedResponse, nil
		}

3609
		request := &milvuspb.QueryRequest{
3610 3611 3612
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3613
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3614 3615
		}

3616
		_, err = node.Query(ctx, request)
3617
		if err != nil {
3618
			log.Debug("Failed to execute dummy query")
3619 3620
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3621 3622 3623 3624 3625 3626

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3627 3628
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3629 3630
}

J
jingkl 已提交
3631
// RegisterLink registers a link
C
Cai Yudong 已提交
3632
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
3633
	code := node.stateCode.Load().(commonpb.StateCode)
D
dragondriver 已提交
3634
	log.Debug("RegisterLink",
3635
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3636
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3637

3638
	if code != commonpb.StateCode_Healthy {
3639 3640 3641
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3642
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3643
				Reason:    "proxy not healthy",
3644 3645 3646
			},
		}, nil
	}
E
Enwei Jiao 已提交
3647
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Inc()
3648 3649 3650
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3651
			ErrorCode: commonpb.ErrorCode_Success,
3652
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3653 3654 3655
		},
	}, nil
}
3656

3657
// GetMetrics gets the metrics of proxy
3658 3659 3660
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3661
		zap.Int64("node_id", paramtable.GetNodeID()),
3662 3663 3664 3665
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
E
Enwei Jiao 已提交
3666
			zap.Int64("node_id", paramtable.GetNodeID()),
3667
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3668
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3669 3670 3671 3672

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3673
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3674 3675 3676 3677 3678 3679 3680 3681
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
E
Enwei Jiao 已提交
3682
			zap.Int64("node_id", paramtable.GetNodeID()),
3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

3698 3699 3700 3701
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
3702
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3703
	)
3704
	if metricType == metricsinfo.SystemInfoMetrics {
3705 3706 3707 3708 3709 3710 3711
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3712
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3713 3714

		log.Debug("Proxy.GetMetrics",
E
Enwei Jiao 已提交
3715
			zap.Int64("node_id", paramtable.GetNodeID()),
3716 3717 3718 3719 3720
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3721 3722
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3723
		return metrics, nil
3724 3725 3726
	}

	log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
E
Enwei Jiao 已提交
3727
		zap.Int64("node_id", paramtable.GetNodeID()),
3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

3740 3741 3742 3743 3744
// GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface,
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if !node.checkHealthy() {
		log.Warn("Proxy.GetProxyMetrics failed",
E
Enwei Jiao 已提交
3745
			zap.Int64("node_id", paramtable.GetNodeID()),
3746
			zap.String("req", req.Request),
E
Enwei Jiao 已提交
3747
			zap.Error(errProxyIsUnhealthy(paramtable.GetNodeID())))
3748 3749 3750 3751

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
E
Enwei Jiao 已提交
3752
				Reason:    msgProxyIsUnhealthy(paramtable.GetNodeID()),
3753 3754 3755 3756 3757 3758 3759
			},
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetProxyMetrics failed to parse metric type",
E
Enwei Jiao 已提交
3760
			zap.Int64("node_id", paramtable.GetNodeID()),
3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

3772 3773 3774 3775
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo),
		commonpbutil.WithMsgID(0),
		commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
3776
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
3777
	)
3778 3779 3780 3781 3782

	if metricType == metricsinfo.SystemInfoMetrics {
		proxyMetrics, err := getProxyMetrics(ctx, req, node)
		if err != nil {
			log.Warn("Proxy.GetProxyMetrics failed to getProxyMetrics",
E
Enwei Jiao 已提交
3783
				zap.Int64("node_id", paramtable.GetNodeID()),
3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795
				zap.String("req", req.Request),
				zap.Error(err))

			return &milvuspb.GetMetricsResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		log.Debug("Proxy.GetProxyMetrics",
E
Enwei Jiao 已提交
3796
			zap.Int64("node_id", paramtable.GetNodeID()),
3797
			zap.String("req", req.Request),
3798
			zap.String("metric_type", metricType))
3799 3800 3801 3802 3803

		return proxyMetrics, nil
	}

	log.Debug("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
E
Enwei Jiao 已提交
3804
		zap.Int64("node_id", paramtable.GetNodeID()),
3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
	}, nil
}

B
bigsheeper 已提交
3816 3817 3818
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("Proxy.LoadBalance",
E
Enwei Jiao 已提交
3819
		zap.Int64("proxy_id", paramtable.GetNodeID()),
B
bigsheeper 已提交
3820 3821 3822 3823 3824 3825 3826 3827 3828
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3829 3830 3831 3832 3833 3834 3835

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
		log.Error("failed to get collection id", zap.String("collection name", req.GetCollectionName()), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3836
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
3837 3838 3839 3840
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_LoadBalanceSegments),
			commonpbutil.WithMsgID(0),
			commonpbutil.WithTimeStamp(0),
E
Enwei Jiao 已提交
3841
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
3842
		),
B
bigsheeper 已提交
3843 3844
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3845
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3846
		SealedSegmentIDs: req.SealedSegmentIDs,
3847
		CollectionID:     collectionID,
B
bigsheeper 已提交
3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864
	})
	if err != nil {
		log.Error("Failed to LoadBalance from Query Coordinator",
			zap.Any("req", req), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
		log.Error("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason))
		status.Reason = infoResp.Reason
		return status, nil
	}
	log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp))
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

3865 3866 3867 3868 3869 3870 3871 3872 3873
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
	log.Info("received get replicas request")
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

S
smellthemoon 已提交
3874 3875
	req.Base = commonpbutil.NewMsgBase(
		commonpbutil.WithMsgType(commonpb.MsgType_GetReplicas),
E
Enwei Jiao 已提交
3876
		commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
3877
	)
3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	if err != nil {
		log.Error("Failed to get replicas from Query Coordinator", zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
	return resp, nil
}

3890
// GetCompactionState gets the compaction state of multiple segments
3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
	log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
	log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3904
// ManualCompaction invokes compaction on specified collection
3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
	log.Info("received ManualCompaction request", zap.Int64("collectionID", req.GetCollectionID()))
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
	log.Info("received ManualCompaction response", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3918
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
	log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

B
Bingyi Sun 已提交
3932 3933 3934
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	log.Info("received get flush state request", zap.Any("request", req))
3935
	var err error
B
Bingyi Sun 已提交
3936 3937 3938 3939 3940 3941 3942
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		log.Info("unable to get flush state because of closed server")
		return resp, nil
	}

3943
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3944 3945 3946 3947
	if err != nil {
		log.Info("failed to get flush state response", zap.Error(err))
		return nil, err
	}
B
Bingyi Sun 已提交
3948 3949 3950 3951
	log.Info("received get flush state response", zap.Any("response", resp))
	return resp, err
}

C
Cai Yudong 已提交
3952 3953
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3954 3955
	code := node.stateCode.Load().(commonpb.StateCode)
	return code == commonpb.StateCode_Healthy
3956 3957
}

3958 3959 3960
func (node *Proxy) checkHealthyAndReturnCode() (commonpb.StateCode, bool) {
	code := node.stateCode.Load().(commonpb.StateCode)
	return code, code == commonpb.StateCode_Healthy
3961 3962
}

3963
// unhealthyStatus returns the proxy not healthy status
3964 3965 3966
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3967
		Reason:    "proxy not healthy",
3968 3969
	}
}
G
groot 已提交
3970 3971 3972

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3973 3974
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
G
groot 已提交
3975 3976
		zap.String("partition name", req.GetPartitionName()),
		zap.Strings("files", req.GetFiles()))
3977 3978 3979 3980 3981 3982
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3983 3984 3985 3986
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3987

3988 3989 3990 3991 3992 3993 3994 3995
	err := importutil.ValidateOptions(req.GetOptions())
	if err != nil {
		log.Error("failed to execute import request", zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = "request options is not illegal    \n" + err.Error() + "    \nIllegal option format    \n" + importutil.OptionFormat
		return resp, nil
	}

3996 3997
	method := "Import"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
3998
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
3999 4000
		metrics.TotalLabel).Inc()

4001
	// Call rootCoord to finish import.
4002 4003
	respFromRC, err := node.rootCoord.Import(ctx, req)
	if err != nil {
E
Enwei Jiao 已提交
4004
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
G
groot 已提交
4005
		log.Error("failed to execute bulk insert request", zap.Error(err))
4006 4007 4008 4009
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}
4010

E
Enwei Jiao 已提交
4011 4012
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
4013
	return respFromRC, nil
G
groot 已提交
4014 4015
}

4016
// GetImportState checks import task state from RootCoord.
G
groot 已提交
4017 4018 4019 4020 4021 4022 4023
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
	log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
4024 4025
	method := "GetImportState"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
4026
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
4027
		metrics.TotalLabel).Inc()
G
groot 已提交
4028 4029

	resp, err := node.rootCoord.GetImportState(ctx, req)
4030
	if err != nil {
E
Enwei Jiao 已提交
4031
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
4032 4033 4034 4035 4036 4037 4038
		log.Error("failed to execute get import state", zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, nil
	}

	log.Info("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
E
Enwei Jiao 已提交
4039 4040
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
4041
	return resp, nil
G
groot 已提交
4042 4043 4044 4045 4046 4047 4048 4049 4050 4051
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
	log.Info("received list import tasks request")
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
4052 4053
	method := "ListImportTasks"
	tr := timerecord.NewTimeRecorder(method)
E
Enwei Jiao 已提交
4054
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
4055
		metrics.TotalLabel).Inc()
G
groot 已提交
4056
	resp, err := node.rootCoord.ListImportTasks(ctx, req)
4057
	if err != nil {
E
Enwei Jiao 已提交
4058
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
4059 4060 4061
		log.Error("failed to execute list import tasks", zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
X
XuanYang-cn 已提交
4062 4063 4064
		return resp, nil
	}

4065
	log.Info("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks))
E
Enwei Jiao 已提交
4066 4067
	metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
X
XuanYang-cn 已提交
4068 4069 4070
	return resp, err
}

4071 4072 4073 4074 4075 4076
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
4077
	if !node.checkHealthy() {
4078
		return unhealthyStatus(), nil
4079
	}
4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))
4101
	if !node.checkHealthy() {
4102
		return unhealthyStatus(), nil
4103
	}
4104 4105

	credInfo := &internalpb.CredentialInfo{
4106 4107
		Username:       request.Username,
		Sha256Password: request.Password,
4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
4123 4124
	log.Debug("CreateCredential", zap.String("role", typeutil.ProxyRole), zap.String("username", req.Username))
	if !node.checkHealthy() {
4125
		return unhealthyStatus(), nil
4126
	}
4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
4158

4159 4160 4161
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
4162
		Sha256Password:    crypto.SHA256(rawPassword, req.Username),
4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
		log.Error("create credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
4175
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
4176 4177
	log.Debug("UpdateCredential", zap.String("role", typeutil.ProxyRole), zap.String("username", req.Username))
	if !node.checkHealthy() {
4178
		return unhealthyStatus(), nil
4179
	}
C
codeman 已提交
4180 4181 4182 4183 4184 4185 4186 4187 4188
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
		log.Error("decode old password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
4189 4190 4191 4192 4193 4194 4195
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
4196 4197
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
4198 4199 4200 4201 4202 4203
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
4204 4205

	if !passwordVerify(ctx, req.Username, rawOldPassword, globalMetaCache) {
C
codeman 已提交
4206 4207 4208 4209 4210 4211 4212
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
4213 4214 4215 4216 4217 4218 4219
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
4220
	updateCredReq := &internalpb.CredentialInfo{
4221
		Username:          req.Username,
4222
		Sha256Password:    crypto.SHA256(rawNewPassword, req.Username),
4223 4224
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
4225
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236
	if err != nil { // for error like conntext timeout etc.
		log.Error("update credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
4237 4238
	log.Debug("DeleteCredential", zap.String("role", typeutil.ProxyRole), zap.String("username", req.Username))
	if !node.checkHealthy() {
4239
		return unhealthyStatus(), nil
4240 4241
	}

4242 4243 4244 4245 4246 4247
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
		log.Error("delete credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
4260 4261
	log.Debug("ListCredUsers", zap.String("role", typeutil.ProxyRole))
	if !node.checkHealthy() {
4262
		return &milvuspb.ListCredUsersResponse{Status: unhealthyStatus()}, nil
4263
	}
4264
	rootCoordReq := &milvuspb.ListCredUsersRequest{
4265 4266 4267
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ListCredUsernames),
		),
4268 4269
	}
	resp, err := node.rootCoord.ListCredUsers(ctx, rootCoordReq)
4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
4282
		Usernames: resp.Usernames,
4283 4284
	}, nil
}
4285

4286 4287 4288
func (node *Proxy) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
	logger.Debug("CreateRole", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4289
		return errorutil.UnhealthyStatus(code), nil
4290 4291 4292 4293 4294 4295 4296 4297 4298 4299
	}

	var roleName string
	if req.Entity != nil {
		roleName = req.Entity.Name
	}
	if err := ValidateRoleName(roleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4300
		}, nil
4301 4302 4303 4304 4305 4306 4307 4308
	}

	result, err := node.rootCoord.CreateRole(ctx, req)
	if err != nil {
		logger.Error("fail to create role", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4309
		}, nil
4310 4311
	}
	return result, nil
4312 4313
}

4314 4315 4316
func (node *Proxy) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
	logger.Debug("DropRole", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4317
		return errorutil.UnhealthyStatus(code), nil
4318 4319 4320 4321 4322
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4323
		}, nil
4324
	}
4325 4326 4327 4328 4329
	if IsDefaultRole(req.RoleName) {
		errMsg := fmt.Sprintf("the role[%s] is a default role, which can't be droped", req.RoleName)
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    errMsg,
4330
		}, nil
4331
	}
4332 4333 4334 4335 4336 4337
	result, err := node.rootCoord.DropRole(ctx, req)
	if err != nil {
		logger.Error("fail to drop role", zap.String("role_name", req.RoleName), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4338
		}, nil
4339 4340
	}
	return result, nil
4341 4342
}

4343 4344 4345
func (node *Proxy) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
	logger.Debug("OperateUserRole", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4346
		return errorutil.UnhealthyStatus(code), nil
4347 4348 4349 4350 4351
	}
	if err := ValidateUsername(req.Username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4352
		}, nil
4353 4354 4355 4356 4357
	}
	if err := ValidateRoleName(req.RoleName); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4358
		}, nil
4359 4360 4361 4362 4363 4364 4365 4366
	}

	result, err := node.rootCoord.OperateUserRole(ctx, req)
	if err != nil {
		logger.Error("fail to operate user role", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4367
		}, nil
4368 4369
	}
	return result, nil
4370 4371
}

4372 4373 4374
func (node *Proxy) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
	logger.Debug("SelectRole", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4375
		return &milvuspb.SelectRoleResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4376 4377 4378 4379 4380 4381 4382 4383 4384
	}

	if req.Role != nil {
		if err := ValidateRoleName(req.Role.Name); err != nil {
			return &milvuspb.SelectRoleResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4385
			}, nil
4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396
		}
	}

	result, err := node.rootCoord.SelectRole(ctx, req)
	if err != nil {
		logger.Error("fail to select role", zap.Error(err))
		return &milvuspb.SelectRoleResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4397
		}, nil
4398 4399
	}
	return result, nil
4400 4401
}

4402 4403 4404
func (node *Proxy) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
	logger.Debug("SelectUser", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4405
		return &milvuspb.SelectUserResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4406 4407 4408 4409 4410 4411 4412 4413 4414
	}

	if req.User != nil {
		if err := ValidateUsername(req.User.Name); err != nil {
			return &milvuspb.SelectUserResponse{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_IllegalArgument,
					Reason:    err.Error(),
				},
4415
			}, nil
4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426
		}
	}

	result, err := node.rootCoord.SelectUser(ctx, req)
	if err != nil {
		logger.Error("fail to select user", zap.Error(err))
		return &milvuspb.SelectUserResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4427
		}, nil
4428 4429
	}
	return result, nil
4430 4431
}

4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461
func (node *Proxy) validPrivilegeParams(req *milvuspb.OperatePrivilegeRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the entity in the request is nil")
	}
	if req.Entity.Grantor == nil {
		return fmt.Errorf("the grantor entity in the grant entity is nil")
	}
	if req.Entity.Grantor.Privilege == nil {
		return fmt.Errorf("the privilege entity in the grantor entity is nil")
	}
	if err := ValidatePrivilege(req.Entity.Grantor.Privilege.Name); err != nil {
		return err
	}
	if req.Entity.Object == nil {
		return fmt.Errorf("the resource entity in the grant entity is nil")
	}
	if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
		return err
	}
	if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
		return err
	}
	if req.Entity.Role == nil {
		return fmt.Errorf("the object entity in the grant entity is nil")
	}
	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
4462 4463
}

4464 4465 4466
func (node *Proxy) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
	logger.Debug("OperatePrivilege", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4467
		return errorutil.UnhealthyStatus(code), nil
4468 4469 4470 4471 4472
	}
	if err := node.validPrivilegeParams(req); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
4473
		}, nil
4474 4475 4476 4477 4478 4479
	}
	curUser, err := GetCurUserFromContext(ctx)
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4480
		}, nil
4481 4482 4483 4484 4485 4486 4487 4488
	}
	req.Entity.Grantor.User = &milvuspb.UserEntity{Name: curUser}
	result, err := node.rootCoord.OperatePrivilege(ctx, req)
	if err != nil {
		logger.Error("fail to operate privilege", zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
4489
		}, nil
4490 4491
	}
	return result, nil
4492 4493
}

4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517 4518 4519 4520 4521 4522
func (node *Proxy) validGrantParams(req *milvuspb.SelectGrantRequest) error {
	if req.Entity == nil {
		return fmt.Errorf("the grant entity in the request is nil")
	}

	if req.Entity.Object != nil {
		if err := ValidateObjectType(req.Entity.Object.Name); err != nil {
			return err
		}

		if err := ValidateObjectName(req.Entity.ObjectName); err != nil {
			return err
		}
	}

	if req.Entity.Role == nil {
		return fmt.Errorf("the role entity in the grant entity is nil")
	}

	if err := ValidateRoleName(req.Entity.Role.Name); err != nil {
		return err
	}

	return nil
}

func (node *Proxy) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
	logger.Debug("SelectGrant", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
4523
		return &milvuspb.SelectGrantResponse{Status: errorutil.UnhealthyStatus(code)}, nil
4524 4525 4526 4527 4528 4529 4530 4531
	}

	if err := node.validGrantParams(req); err != nil {
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_IllegalArgument,
				Reason:    err.Error(),
			},
4532
		}, nil
4533 4534 4535 4536 4537 4538 4539 4540 4541 4542
	}

	result, err := node.rootCoord.SelectGrant(ctx, req)
	if err != nil {
		logger.Error("fail to select grant", zap.Error(err))
		return &milvuspb.SelectGrantResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
4543
		}, nil
4544 4545 4546 4547 4548 4549 4550 4551 4552 4553 4554 4555 4556 4557 4558 4559 4560 4561 4562 4563 4564 4565 4566 4567 4568 4569 4570 4571
	}
	return result, nil
}

func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
	logger.Debug("RefreshPrivilegeInfoCache", zap.Any("req", req))
	if code, ok := node.checkHealthyAndReturnCode(); !ok {
		return errorutil.UnhealthyStatus(code), errorutil.UnhealthyError()
	}

	if globalMetaCache != nil {
		err := globalMetaCache.RefreshPolicyInfo(typeutil.CacheOp{
			OpType: typeutil.CacheOpType(req.OpType),
			OpKey:  req.OpKey,
		})
		if err != nil {
			log.Error("fail to refresh policy info", zap.Error(err))
			return &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_RefreshPolicyInfoCacheFailure,
				Reason:    err.Error(),
			}, err
		}
	}
	logger.Debug("RefreshPrivilegeInfoCache success")

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
4572
}
4573 4574 4575 4576 4577 4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588 4589 4590 4591 4592

// SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
	resp := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	if !node.checkHealthy() {
		resp = unhealthyStatus()
		return resp, nil
	}

	err := node.multiRateLimiter.globalRateLimiter.setRates(request.GetRates())
	// TODO: set multiple rate limiter rates
	if err != nil {
		resp.Reason = err.Error()
		return resp, nil
	}
	resp.ErrorCode = commonpb.ErrorCode_Success
	return resp, nil
}
4593 4594 4595 4596

func (node *Proxy) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
	if !node.checkHealthy() {
		reason := errorutil.UnHealthReason("proxy", node.session.ServerID, "proxy is unhealthy")
4597 4598 4599 4600
		return &milvuspb.CheckHealthResponse{
			Status:    unhealthyStatus(),
			IsHealthy: false,
			Reasons:   []string{reason}}, nil
4601 4602 4603 4604 4605 4606 4607 4608 4609 4610 4611
	}

	group, ctx := errgroup.WithContext(ctx)
	errReasons := make([]string, 0)

	mu := &sync.Mutex{}
	fn := func(role string, resp *milvuspb.CheckHealthResponse, err error) error {
		mu.Lock()
		defer mu.Unlock()

		if err != nil {
4612
			log.Warn("check health fail", zap.String("role", role), zap.Error(err))
4613 4614 4615 4616 4617
			errReasons = append(errReasons, fmt.Sprintf("check health fail for %s", role))
			return err
		}

		if !resp.IsHealthy {
4618
			log.Warn("check health fail", zap.String("role", role))
4619 4620 4621 4622 4623 4624 4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651
			errReasons = append(errReasons, resp.Reasons...)
		}
		return nil
	}

	group.Go(func() error {
		resp, err := node.rootCoord.CheckHealth(ctx, request)
		return fn("rootcoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.queryCoord.CheckHealth(ctx, request)
		return fn("querycoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.dataCoord.CheckHealth(ctx, request)
		return fn("datacoord", resp, err)
	})

	group.Go(func() error {
		resp, err := node.indexCoord.CheckHealth(ctx, request)
		return fn("indexcoord", resp, err)
	})

	err := group.Wait()
	if err != nil || len(errReasons) != 0 {
		return &milvuspb.CheckHealthResponse{
			IsHealthy: false,
			Reasons:   errReasons,
		}, nil
	}

4652 4653 4654 4655 4656 4657 4658
	return &milvuspb.CheckHealthResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		IsHealthy: true,
	}, nil
4659
}