impl.go 145.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24 25
	"strconv"

26 27
	"github.com/milvus-io/milvus/internal/util"

28
	"go.uber.org/zap"
S
sunby 已提交
29

30
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/log"
32
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
33
	"github.com/milvus-io/milvus/internal/mq/msgstream"
34

X
Xiangyu Wang 已提交
35 36 37 38 39 40
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
41
	"github.com/milvus-io/milvus/internal/proto/schemapb"
42
	"github.com/milvus-io/milvus/internal/util/crypto"
43
	"github.com/milvus-io/milvus/internal/util/distance"
44 45 46
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
47
	"github.com/milvus-io/milvus/internal/util/timerecord"
48
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
49
	"github.com/milvus-io/milvus/internal/util/typeutil"
50 51
)

52 53
const moduleName = "Proxy"

54
// UpdateStateCode updates the state code of Proxy.
C
Cai Yudong 已提交
55
func (node *Proxy) UpdateStateCode(code internalpb.StateCode) {
56
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
57 58
}

59
// GetComponentStates get state of Proxy.
C
Cai Yudong 已提交
60
func (node *Proxy) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
61 62 63 64 65 66 67 68 69 70 71 72
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
73
		return stats, nil
G
godchen 已提交
74
	}
75 76 77 78
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
G
godchen 已提交
79
	info := &internalpb.ComponentInfo{
80 81
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
82
		Role:      typeutil.ProxyRole,
G
godchen 已提交
83 84 85 86 87 88
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
89
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
90
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
91 92 93 94 95 96 97 98 99
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

100
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
101
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
102
	ctx = logutil.WithModule(ctx, moduleName)
103
	logutil.Logger(ctx).Info("received request to invalidate collection meta cache",
104
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
105
		zap.String("db", request.DbName),
106 107
		zap.String("collectionName", request.CollectionName),
		zap.Int64("collectionID", request.CollectionID))
D
dragondriver 已提交
108

109
	collectionName := request.CollectionName
110
	collectionID := request.CollectionID
N
neza2017 已提交
111
	if globalMetaCache != nil {
112 113 114 115 116 117
		if collectionName != "" {
			globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
		}
		if request.CollectionID != UniqueID(0) {
			globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
		}
N
neza2017 已提交
118
	}
119
	logutil.Logger(ctx).Info("complete to invalidate collection meta cache",
120
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
121
		zap.String("db", request.DbName),
122 123
		zap.String("collection", collectionName),
		zap.Int64("collectionID", collectionID))
D
dragondriver 已提交
124

125
	return &commonpb.Status{
126
		ErrorCode: commonpb.ErrorCode_Success,
127 128
		Reason:    "",
	}, nil
129 130
}

131
// ReleaseDQLMessageStream release the query message stream of specific collection.
C
Cai Yudong 已提交
132
func (node *Proxy) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
133 134
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to release DQL message strem",
135
		zap.Any("role", typeutil.ProxyRole),
136 137 138
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

139 140 141 142
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

143 144
	_ = node.chMgr.removeDQLStream(request.CollectionID)

145
	logutil.Logger(ctx).Debug("complete to release DQL message stream",
146
		zap.Any("role", typeutil.ProxyRole),
147 148 149 150 151 152 153 154 155
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

156
// CreateCollection create a collection by the schema.
157
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
C
Cai Yudong 已提交
158
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
159 160 161
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
162 163 164 165

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
166 167 168
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

X
Xiaofan 已提交
169
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
170

171
	cct := &createCollectionTask{
S
sunby 已提交
172
		ctx:                     ctx,
173 174
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
175
		rootCoord:               node.rootCoord,
176 177
	}

178 179 180
	// avoid data race
	lenOfSchema := len(request.Schema)

181 182
	log.Debug(
		rpcReceived(method),
183
		zap.String("traceID", traceID),
184
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
185 186
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
187
		zap.Int("len(schema)", lenOfSchema),
188 189
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
190

191 192 193
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
194 195
			zap.Error(err),
			zap.String("traceID", traceID),
196
			zap.String("role", typeutil.ProxyRole),
197 198 199
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Int("len(schema)", lenOfSchema),
200 201
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
202

X
Xiaofan 已提交
203
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
204
		return &commonpb.Status{
205
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
206 207 208 209
			Reason:    err.Error(),
		}, nil
	}

210 211
	log.Debug(
		rpcEnqueued(method),
212
		zap.String("traceID", traceID),
213
		zap.String("role", typeutil.ProxyRole),
214 215 216
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
217 218
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
219 220
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
221 222
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
223

224 225 226
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
227
			zap.Error(err),
228
			zap.String("traceID", traceID),
229
			zap.String("role", typeutil.ProxyRole),
230 231 232
			zap.Int64("MsgID", cct.ID()),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
233 234
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
235
			zap.Int("len(schema)", lenOfSchema),
236 237
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
D
dragondriver 已提交
238

X
Xiaofan 已提交
239
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
240
		return &commonpb.Status{
241
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
242 243 244 245
			Reason:    err.Error(),
		}, nil
	}

246 247
	log.Debug(
		rpcDone(method),
248
		zap.String("traceID", traceID),
249
		zap.String("role", typeutil.ProxyRole),
250 251 252 253 254 255
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
256 257
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
258

X
Xiaofan 已提交
259 260
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
261 262 263
	return cct.result, nil
}

264
// DropCollection drop a collection.
C
Cai Yudong 已提交
265
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
266 267 268
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
269 270 271 272

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
273 274
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
275
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
276

277
	dct := &dropCollectionTask{
S
sunby 已提交
278
		ctx:                   ctx,
279 280
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
281
		rootCoord:             node.rootCoord,
282
		chMgr:                 node.chMgr,
S
sunby 已提交
283
		chTicker:              node.chTicker,
284 285
	}

286 287
	log.Debug("DropCollection received",
		zap.String("traceID", traceID),
288
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
289 290
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
291 292 293 294 295

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
296
			zap.String("role", typeutil.ProxyRole),
297 298 299
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
300
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
301
		return &commonpb.Status{
302
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
303 304 305 306
			Reason:    err.Error(),
		}, nil
	}

307 308
	log.Debug("DropCollection enqueued",
		zap.String("traceID", traceID),
309
		zap.String("role", typeutil.ProxyRole),
310 311 312
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
313 314
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
315 316 317

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
318
			zap.Error(err),
319
			zap.String("traceID", traceID),
320
			zap.String("role", typeutil.ProxyRole),
321 322 323
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTs", dct.BeginTs()),
			zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
324 325 326
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
327
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
328
		return &commonpb.Status{
329
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
330 331 332 333
			Reason:    err.Error(),
		}, nil
	}

334 335
	log.Debug("DropCollection done",
		zap.String("traceID", traceID),
336
		zap.String("role", typeutil.ProxyRole),
337 338 339 340 341 342
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
343 344
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
345 346 347
	return dct.result, nil
}

348
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
349
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
350 351 352 353 354
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
355 356 357 358

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
359 360
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
361
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
362
		metrics.TotalLabel).Inc()
363 364 365

	log.Debug("HasCollection received",
		zap.String("traceID", traceID),
366
		zap.String("role", typeutil.ProxyRole),
367 368 369
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

370
	hct := &hasCollectionTask{
S
sunby 已提交
371
		ctx:                  ctx,
372 373
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
374
		rootCoord:            node.rootCoord,
375 376
	}

377 378 379 380
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
381
			zap.String("role", typeutil.ProxyRole),
382 383 384
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
385
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
386
			metrics.AbandonLabel).Inc()
387 388
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
389
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
390 391 392 393 394
				Reason:    err.Error(),
			},
		}, nil
	}

395 396
	log.Debug("HasCollection enqueued",
		zap.String("traceID", traceID),
397
		zap.String("role", typeutil.ProxyRole),
398 399 400
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
401 402
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
403 404 405

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
406
			zap.Error(err),
407
			zap.String("traceID", traceID),
408
			zap.String("role", typeutil.ProxyRole),
409 410 411
			zap.Int64("MsgID", hct.ID()),
			zap.Uint64("BeginTS", hct.BeginTs()),
			zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
412 413 414
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
415
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
416
			metrics.FailLabel).Inc()
417 418
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
419
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
420 421 422 423 424
				Reason:    err.Error(),
			},
		}, nil
	}

425 426
	log.Debug("HasCollection done",
		zap.String("traceID", traceID),
427
		zap.String("role", typeutil.ProxyRole),
428 429 430 431 432 433
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
434
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
435
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
436
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
437 438 439
	return hct.result, nil
}

440
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
441
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
442 443 444
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
445 446 447 448

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
449 450
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
451

452
	lct := &loadCollectionTask{
S
sunby 已提交
453
		ctx:                   ctx,
454 455
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
456
		queryCoord:            node.queryCoord,
457 458
	}

459 460
	log.Debug("LoadCollection received",
		zap.String("traceID", traceID),
461
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
462 463
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
464 465 466 467 468

	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
469
			zap.String("role", typeutil.ProxyRole),
470 471 472
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
473
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
474
			metrics.AbandonLabel).Inc()
475
		return &commonpb.Status{
476
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
477 478 479
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
480

481 482
	log.Debug("LoadCollection enqueued",
		zap.String("traceID", traceID),
483
		zap.String("role", typeutil.ProxyRole),
484 485 486
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
487 488
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
489 490 491

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
492
			zap.Error(err),
493
			zap.String("traceID", traceID),
494
			zap.String("role", typeutil.ProxyRole),
495 496 497
			zap.Int64("MsgID", lct.ID()),
			zap.Uint64("BeginTS", lct.BeginTs()),
			zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
498 499 500
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
501
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
502
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
503
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
504
			metrics.FailLabel).Inc()
505
		return &commonpb.Status{
506
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
507 508 509 510
			Reason:    err.Error(),
		}, nil
	}

511 512
	log.Debug("LoadCollection done",
		zap.String("traceID", traceID),
513
		zap.String("role", typeutil.ProxyRole),
514 515 516 517 518 519
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
520
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
521
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
522
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
523
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
524
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
525
	return lct.result, nil
526 527
}

528
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
529
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
530 531 532
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
533

534
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
535 536
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
537 538
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
539

540
	rct := &releaseCollectionTask{
S
sunby 已提交
541
		ctx:                      ctx,
542 543
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
544
		queryCoord:               node.queryCoord,
545
		chMgr:                    node.chMgr,
546 547
	}

548 549
	log.Debug(
		rpcReceived(method),
550
		zap.String("traceID", traceID),
551
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
552 553
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
554 555

	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
556 557
		log.Warn(
			rpcFailedToEnqueue(method),
558 559
			zap.Error(err),
			zap.String("traceID", traceID),
560
			zap.String("role", typeutil.ProxyRole),
561 562 563
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
564
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
565
			metrics.AbandonLabel).Inc()
566
		return &commonpb.Status{
567
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
568 569 570 571
			Reason:    err.Error(),
		}, nil
	}

572 573
	log.Debug(
		rpcEnqueued(method),
574
		zap.String("traceID", traceID),
575
		zap.String("role", typeutil.ProxyRole),
576 577 578
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
579 580
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
581 582

	if err := rct.WaitToFinish(); err != nil {
583 584
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
585
			zap.Error(err),
586
			zap.String("traceID", traceID),
587
			zap.String("role", typeutil.ProxyRole),
588 589 590
			zap.Int64("MsgID", rct.ID()),
			zap.Uint64("BeginTS", rct.BeginTs()),
			zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
591 592 593
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
594
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
595
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
596
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
597
			metrics.FailLabel).Inc()
598
		return &commonpb.Status{
599
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
600 601 602 603
			Reason:    err.Error(),
		}, nil
	}

604 605
	log.Debug(
		rpcDone(method),
606
		zap.String("traceID", traceID),
607
		zap.String("role", typeutil.ProxyRole),
608 609 610 611 612 613
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
614
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
615
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
616
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
617
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
618
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
619
	return rct.result, nil
620 621
}

622
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
623
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
624 625 626 627 628
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
629

630
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
631 632
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
633 634
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
635

636
	dct := &describeCollectionTask{
S
sunby 已提交
637
		ctx:                       ctx,
638 639
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
640
		rootCoord:                 node.rootCoord,
641 642
	}

643 644
	log.Debug("DescribeCollection received",
		zap.String("traceID", traceID),
645
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
646 647
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
648 649 650 651 652

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
653
			zap.String("role", typeutil.ProxyRole),
654 655 656
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
657
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
658
			metrics.AbandonLabel).Inc()
659 660
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
661
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
662 663 664 665 666
				Reason:    err.Error(),
			},
		}, nil
	}

667 668
	log.Debug("DescribeCollection enqueued",
		zap.String("traceID", traceID),
669
		zap.String("role", typeutil.ProxyRole),
670 671 672
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
673 674
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
675 676 677

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
678
			zap.Error(err),
679
			zap.String("traceID", traceID),
680
			zap.String("role", typeutil.ProxyRole),
681 682 683
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTS", dct.BeginTs()),
			zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
684 685 686
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
687
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
688
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
689
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
690
			metrics.FailLabel).Inc()
691

692 693
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
694
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
695 696 697 698 699
				Reason:    err.Error(),
			},
		}, nil
	}

700 701
	log.Debug("DescribeCollection done",
		zap.String("traceID", traceID),
702
		zap.String("role", typeutil.ProxyRole),
703 704 705 706 707 708
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
709
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
710
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
711
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
712
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
713
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
714 715 716
	return dct.result, nil
}

717
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
718
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
719 720 721 722 723
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
724 725 726 727

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
728 729
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
730

731
	g := &getCollectionStatisticsTask{
G
godchen 已提交
732 733 734
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
735
		dataCoord:                      node.dataCoord,
736 737
	}

738 739
	log.Debug("GetCollectionStatistics received",
		zap.String("traceID", traceID),
740
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
741 742
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
743 744 745 746 747

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn("GetCollectionStatistics failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
748
			zap.String("role", typeutil.ProxyRole),
749 750 751
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
752
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
753
			metrics.AbandonLabel).Inc()
754

G
godchen 已提交
755
		return &milvuspb.GetCollectionStatisticsResponse{
756
			Status: &commonpb.Status{
757
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
758 759 760 761 762
				Reason:    err.Error(),
			},
		}, nil
	}

763 764
	log.Debug("GetCollectionStatistics enqueued",
		zap.String("traceID", traceID),
765
		zap.String("role", typeutil.ProxyRole),
766 767 768
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
769 770
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
771 772 773

	if err := g.WaitToFinish(); err != nil {
		log.Warn("GetCollectionStatistics failed to WaitToFinish",
D
dragondriver 已提交
774
			zap.Error(err),
775
			zap.String("traceID", traceID),
776
			zap.String("role", typeutil.ProxyRole),
777 778 779
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
780 781 782
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
783
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
784
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
785
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
786
			metrics.FailLabel).Inc()
787

G
godchen 已提交
788
		return &milvuspb.GetCollectionStatisticsResponse{
789
			Status: &commonpb.Status{
790
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
791 792 793 794 795
				Reason:    err.Error(),
			},
		}, nil
	}

796 797
	log.Debug("GetCollectionStatistics done",
		zap.String("traceID", traceID),
798
		zap.String("role", typeutil.ProxyRole),
799 800 801 802 803 804
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
805
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
806
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
807
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
808
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
809
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
810
	return g.result, nil
811 812
}

813
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
814
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
815 816 817 818 819
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
820 821
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
822
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
823

824
	sct := &showCollectionsTask{
G
godchen 已提交
825 826 827
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
828
		queryCoord:             node.queryCoord,
829
		rootCoord:              node.rootCoord,
830 831
	}

832
	log.Debug("ShowCollections received",
833
		zap.String("role", typeutil.ProxyRole),
834 835 836 837 838 839
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
		zap.Any("CollectionNames", request.CollectionNames),
	)

840
	err := node.sched.ddQueue.Enqueue(sct)
841
	if err != nil {
842 843
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
844
			zap.String("role", typeutil.ProxyRole),
845 846 847 848 849 850
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

X
Xiaofan 已提交
851
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
852
		return &milvuspb.ShowCollectionsResponse{
853
			Status: &commonpb.Status{
854
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
855 856 857 858 859
				Reason:    err.Error(),
			},
		}, nil
	}

860
	log.Debug("ShowCollections enqueued",
861
		zap.String("role", typeutil.ProxyRole),
862
		zap.Int64("MsgID", sct.ID()),
863
		zap.String("DbName", sct.ShowCollectionsRequest.DbName),
864
		zap.Uint64("TimeStamp", request.TimeStamp),
865 866 867
		zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
		zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
	)
D
dragondriver 已提交
868

869 870
	err = sct.WaitToFinish()
	if err != nil {
871 872
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
873
			zap.String("role", typeutil.ProxyRole),
874 875 876 877 878 879 880
			zap.Int64("MsgID", sct.ID()),
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

X
Xiaofan 已提交
881
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
882

G
godchen 已提交
883
		return &milvuspb.ShowCollectionsResponse{
884
			Status: &commonpb.Status{
885
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
886 887 888 889 890
				Reason:    err.Error(),
			},
		}, nil
	}

891
	log.Debug("ShowCollections Done",
892
		zap.String("role", typeutil.ProxyRole),
893 894 895 896
		zap.Int64("MsgID", sct.ID()),
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
897 898
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
899

X
Xiaofan 已提交
900 901
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
902 903 904
	return sct.result, nil
}

905
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
906
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
907 908 909
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
910

911
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
912 913
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
914 915
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
916
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
917

918
	cpt := &createPartitionTask{
S
sunby 已提交
919
		ctx:                    ctx,
920 921
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
922
		rootCoord:              node.rootCoord,
923 924 925
		result:                 nil,
	}

926 927 928
	log.Debug(
		rpcReceived("CreatePartition"),
		zap.String("traceID", traceID),
929
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
930 931 932
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
933 934 935 936 937 938

	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
			zap.Error(err),
			zap.String("traceID", traceID),
939
			zap.String("role", typeutil.ProxyRole),
940 941 942 943
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
944
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
945

946
		return &commonpb.Status{
947
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
948 949 950
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
951

952 953 954
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.String("traceID", traceID),
955
		zap.String("role", typeutil.ProxyRole),
956 957 958
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
959 960 961
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
962 963 964 965

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
966
			zap.Error(err),
967
			zap.String("traceID", traceID),
968
			zap.String("role", typeutil.ProxyRole),
969 970 971
			zap.Int64("MsgID", cpt.ID()),
			zap.Uint64("BeginTS", cpt.BeginTs()),
			zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
972 973 974 975
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
976
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
977

978
		return &commonpb.Status{
979
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
980 981 982
			Reason:    err.Error(),
		}, nil
	}
983 984 985 986

	log.Debug(
		rpcDone("CreatePartition"),
		zap.String("traceID", traceID),
987
		zap.String("role", typeutil.ProxyRole),
988 989 990 991 992 993 994
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
995 996
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
997 998 999
	return cpt.result, nil
}

1000
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
1001
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
1002 1003 1004
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1005

1006
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
1007 1008
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1009 1010
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
1011
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
1012

1013
	dpt := &dropPartitionTask{
S
sunby 已提交
1014
		ctx:                  ctx,
1015 1016
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
1017
		rootCoord:            node.rootCoord,
1018 1019 1020
		result:               nil,
	}

1021 1022 1023
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1024
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1025 1026 1027
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1028 1029 1030 1031 1032 1033

	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1034
			zap.String("role", typeutil.ProxyRole),
1035 1036 1037 1038
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1039
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
1040

1041
		return &commonpb.Status{
1042
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1043 1044 1045
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1046

1047 1048 1049
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1050
		zap.String("role", typeutil.ProxyRole),
1051 1052 1053
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1054 1055 1056
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1057 1058 1059 1060

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1061
			zap.Error(err),
1062
			zap.String("traceID", traceID),
1063
			zap.String("role", typeutil.ProxyRole),
1064 1065 1066
			zap.Int64("MsgID", dpt.ID()),
			zap.Uint64("BeginTS", dpt.BeginTs()),
			zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1067 1068 1069 1070
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1071
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
1072

1073
		return &commonpb.Status{
1074
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1075 1076 1077
			Reason:    err.Error(),
		}, nil
	}
1078 1079 1080 1081

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1082
		zap.String("role", typeutil.ProxyRole),
1083 1084 1085 1086 1087 1088 1089
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1090 1091
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1092 1093 1094
	return dpt.result, nil
}

1095
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1096
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1097 1098 1099 1100 1101
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1102

D
dragondriver 已提交
1103
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1104 1105
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1106 1107 1108
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
X
Xiaofan 已提交
1109
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1110
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1111

1112
	hpt := &hasPartitionTask{
S
sunby 已提交
1113
		ctx:                 ctx,
1114 1115
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1116
		rootCoord:           node.rootCoord,
1117 1118 1119
		result:              nil,
	}

D
dragondriver 已提交
1120 1121 1122
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1123
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1124 1125 1126
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1127 1128 1129 1130 1131 1132

	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1133
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1134 1135 1136 1137
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1138
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1139
			metrics.AbandonLabel).Inc()
1140

1141 1142
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1143
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1144 1145 1146 1147 1148
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1149

D
dragondriver 已提交
1150 1151 1152
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1153
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1154 1155 1156
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1157 1158 1159
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1160 1161 1162 1163

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1164
			zap.Error(err),
D
dragondriver 已提交
1165
			zap.String("traceID", traceID),
1166
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1167 1168 1169
			zap.Int64("MsgID", hpt.ID()),
			zap.Uint64("BeginTS", hpt.BeginTs()),
			zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1170 1171 1172 1173
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1174
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1175
			metrics.FailLabel).Inc()
1176

1177 1178
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1179
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1180 1181 1182 1183 1184
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1185 1186 1187 1188

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1189
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1190 1191 1192 1193 1194 1195 1196
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1197
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1198
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1199
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1200 1201 1202
	return hpt.result, nil
}

1203
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1204
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1205 1206 1207
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1208

D
dragondriver 已提交
1209
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1210 1211
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1212 1213
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
1214

1215
	lpt := &loadPartitionsTask{
G
godchen 已提交
1216 1217 1218
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1219
		queryCoord:            node.queryCoord,
1220 1221
	}

1222 1223 1224
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1225
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1226 1227 1228
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1229 1230 1231 1232 1233 1234

	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1235
			zap.String("role", typeutil.ProxyRole),
1236 1237 1238 1239
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1240
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1241
			metrics.AbandonLabel).Inc()
1242

1243
		return &commonpb.Status{
1244
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1245 1246 1247 1248
			Reason:    err.Error(),
		}, nil
	}

1249 1250 1251
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1252
		zap.String("role", typeutil.ProxyRole),
1253 1254 1255
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1256 1257 1258
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1259 1260 1261 1262

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1263
			zap.Error(err),
1264
			zap.String("traceID", traceID),
1265
			zap.String("role", typeutil.ProxyRole),
1266 1267 1268
			zap.Int64("MsgID", lpt.ID()),
			zap.Uint64("BeginTS", lpt.BeginTs()),
			zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1269 1270 1271 1272
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1273
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1274
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1275
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1276
			metrics.FailLabel).Inc()
1277

1278
		return &commonpb.Status{
1279
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1280 1281 1282 1283
			Reason:    err.Error(),
		}, nil
	}

1284 1285 1286
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1287
		zap.String("role", typeutil.ProxyRole),
1288 1289 1290 1291 1292 1293 1294
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1295
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1296
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1297
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1298
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1299
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1300
	return lpt.result, nil
1301 1302
}

1303
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1304
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1305 1306 1307
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1308 1309 1310 1311 1312

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1313
	rpt := &releasePartitionsTask{
G
godchen 已提交
1314 1315 1316
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1317
		queryCoord:               node.queryCoord,
1318 1319
	}

1320
	method := "ReleasePartitions"
1321
	tr := timerecord.NewTimeRecorder(method)
1322 1323 1324 1325

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1326
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1327 1328 1329
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1330 1331 1332 1333 1334 1335

	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1336
			zap.String("role", typeutil.ProxyRole),
1337 1338 1339 1340
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1341
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1342
			metrics.AbandonLabel).Inc()
1343

1344
		return &commonpb.Status{
1345
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1346 1347 1348 1349
			Reason:    err.Error(),
		}, nil
	}

1350 1351 1352
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1353
		zap.String("role", typeutil.ProxyRole),
1354 1355 1356
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1357 1358 1359
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1360 1361 1362 1363

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1364
			zap.Error(err),
1365
			zap.String("traceID", traceID),
1366
			zap.String("role", typeutil.ProxyRole),
1367 1368 1369
			zap.Int64("msgID", rpt.Base.MsgID),
			zap.Uint64("BeginTS", rpt.BeginTs()),
			zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1370 1371 1372 1373
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1374
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1375
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1376
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1377
			metrics.FailLabel).Inc()
1378

1379
		return &commonpb.Status{
1380
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1381 1382 1383 1384
			Reason:    err.Error(),
		}, nil
	}

1385 1386 1387
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1388
		zap.String("role", typeutil.ProxyRole),
1389 1390 1391 1392 1393 1394 1395
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

X
Xiaofan 已提交
1396
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1397
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1398
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1399
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1400
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1401
	return rpt.result, nil
1402 1403
}

1404
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1405
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1406 1407 1408 1409 1410
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1411 1412 1413 1414 1415

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1416
	g := &getPartitionStatisticsTask{
1417 1418 1419
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1420
		dataCoord:                     node.dataCoord,
1421 1422
	}

1423
	method := "GetPartitionStatistics"
1424
	tr := timerecord.NewTimeRecorder(method)
1425 1426 1427 1428

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1429
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1430 1431 1432
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1433 1434 1435 1436 1437 1438

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1439
			zap.String("role", typeutil.ProxyRole),
1440 1441 1442 1443
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1444
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1445
			metrics.AbandonLabel).Inc()
1446

1447 1448 1449 1450 1451 1452 1453 1454
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1455 1456 1457
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1458
		zap.String("role", typeutil.ProxyRole),
1459 1460 1461
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
1462 1463 1464
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1465 1466 1467 1468

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1469
			zap.Error(err),
1470
			zap.String("traceID", traceID),
1471
			zap.String("role", typeutil.ProxyRole),
1472 1473 1474
			zap.Int64("msgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
1475 1476 1477 1478
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1479
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1480
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1481
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1482
			metrics.FailLabel).Inc()
1483

1484 1485 1486 1487 1488 1489 1490 1491
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1492 1493 1494
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1495
		zap.String("role", typeutil.ProxyRole),
1496 1497 1498 1499 1500 1501 1502
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

X
Xiaofan 已提交
1503
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1504
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1505
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1506
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1507
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1508
	return g.result, nil
1509 1510
}

1511
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1512
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1513 1514 1515 1516 1517
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1518 1519 1520 1521 1522

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1523
	spt := &showPartitionsTask{
G
godchen 已提交
1524 1525 1526
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1527
		rootCoord:             node.rootCoord,
1528
		queryCoord:            node.queryCoord,
G
godchen 已提交
1529
		result:                nil,
1530 1531
	}

1532
	method := "ShowPartitions"
1533 1534
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
X
Xiaofan 已提交
1535
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1536
		metrics.TotalLabel).Inc()
1537 1538 1539 1540

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1541
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1542
		zap.Any("request", request))
1543 1544 1545 1546 1547 1548

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1549
			zap.String("role", typeutil.ProxyRole),
1550 1551
			zap.Any("request", request))

X
Xiaofan 已提交
1552
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1553
			metrics.AbandonLabel).Inc()
1554

G
godchen 已提交
1555
		return &milvuspb.ShowPartitionsResponse{
1556
			Status: &commonpb.Status{
1557
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1558 1559 1560 1561 1562
				Reason:    err.Error(),
			},
		}, nil
	}

1563 1564 1565
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1566
		zap.String("role", typeutil.ProxyRole),
1567 1568 1569
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1570 1571
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1572 1573 1574 1575 1576
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1577
			zap.Error(err),
1578
			zap.String("traceID", traceID),
1579
			zap.String("role", typeutil.ProxyRole),
1580 1581 1582 1583 1584 1585
			zap.Int64("msgID", spt.ID()),
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1586

X
Xiaofan 已提交
1587
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1588
			metrics.FailLabel).Inc()
1589

G
godchen 已提交
1590
		return &milvuspb.ShowPartitionsResponse{
1591
			Status: &commonpb.Status{
1592
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1593 1594 1595 1596
				Reason:    err.Error(),
			},
		}, nil
	}
1597 1598 1599 1600

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1601
		zap.String("role", typeutil.ProxyRole),
1602 1603 1604 1605 1606 1607 1608
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

X
Xiaofan 已提交
1609
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1610
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1611
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1612 1613 1614
	return spt.result, nil
}

1615
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1616
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1617 1618 1619
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1620 1621 1622 1623 1624

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1625
	cit := &createIndexTask{
S
sunby 已提交
1626
		ctx:                ctx,
1627 1628
		Condition:          NewTaskCondition(ctx),
		CreateIndexRequest: request,
1629
		rootCoord:          node.rootCoord,
1630 1631
	}

D
dragondriver 已提交
1632
	method := "CreateIndex"
1633
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1634 1635 1636 1637

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1638
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1639 1640 1641 1642
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1643 1644 1645 1646 1647 1648

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1649
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1650 1651 1652 1653 1654
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1655
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1656
			metrics.AbandonLabel).Inc()
1657

1658
		return &commonpb.Status{
1659
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1660 1661 1662 1663
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1664 1665 1666
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1667
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1668 1669 1670
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1671 1672 1673 1674
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1675 1676 1677 1678

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1679
			zap.Error(err),
D
dragondriver 已提交
1680
			zap.String("traceID", traceID),
1681
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1682 1683 1684
			zap.Int64("MsgID", cit.ID()),
			zap.Uint64("BeginTs", cit.BeginTs()),
			zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1685 1686 1687 1688 1689
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1690
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1691
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1692
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1693
			metrics.FailLabel).Inc()
1694

1695
		return &commonpb.Status{
1696
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1697 1698 1699 1700
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1701 1702 1703
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1704
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1705 1706 1707 1708 1709 1710 1711 1712
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))

X
Xiaofan 已提交
1713
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1714
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1715
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1716
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1717
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1718 1719 1720
	return cit.result, nil
}

1721
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1722
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1723 1724 1725 1726 1727
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1728 1729 1730 1731 1732

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1733
	dit := &describeIndexTask{
S
sunby 已提交
1734
		ctx:                  ctx,
1735 1736
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1737
		rootCoord:            node.rootCoord,
1738 1739
	}

1740 1741 1742
	method := "DescribeIndex"
	// avoid data race
	indexName := request.IndexName
1743
	tr := timerecord.NewTimeRecorder(method)
1744 1745 1746 1747

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1748
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1749 1750 1751
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1752 1753 1754 1755 1756 1757 1758
		zap.String("index name", indexName))

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1759
			zap.String("role", typeutil.ProxyRole),
1760 1761 1762 1763 1764
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", indexName))

X
Xiaofan 已提交
1765
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1766
			metrics.AbandonLabel).Inc()
1767

1768 1769
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1770
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1771 1772 1773 1774 1775
				Reason:    err.Error(),
			},
		}, nil
	}

1776 1777 1778
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1779
		zap.String("role", typeutil.ProxyRole),
1780 1781 1782
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1783 1784 1785
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1786 1787 1788 1789 1790
		zap.String("index name", indexName))

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1791
			zap.Error(err),
1792
			zap.String("traceID", traceID),
1793
			zap.String("role", typeutil.ProxyRole),
1794 1795 1796
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1797 1798 1799
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
1800
			zap.String("index name", indexName))
D
dragondriver 已提交
1801

Z
zhenshan.cao 已提交
1802 1803 1804 1805
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
X
Xiaofan 已提交
1806
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1807
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1808
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1809
			metrics.FailLabel).Inc()
1810

1811 1812
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1813
				ErrorCode: errCode,
1814 1815 1816 1817 1818
				Reason:    err.Error(),
			},
		}, nil
	}

1819 1820 1821
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1822
		zap.String("role", typeutil.ProxyRole),
1823 1824 1825 1826 1827 1828 1829 1830
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", indexName))

X
Xiaofan 已提交
1831
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1832
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1833
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1834
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1835
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1836 1837 1838
	return dit.result, nil
}

1839
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1840
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1841 1842 1843
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1844 1845 1846 1847 1848

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1849
	dit := &dropIndexTask{
S
sunby 已提交
1850
		ctx:              ctx,
B
BossZou 已提交
1851 1852
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1853
		rootCoord:        node.rootCoord,
B
BossZou 已提交
1854
	}
G
godchen 已提交
1855

D
dragondriver 已提交
1856
	method := "DropIndex"
1857
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1858 1859 1860 1861

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1862
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1863 1864 1865 1866 1867
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

D
dragondriver 已提交
1868 1869 1870 1871 1872
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1873
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1874 1875 1876 1877
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
1878
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1879
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1880

B
BossZou 已提交
1881
		return &commonpb.Status{
1882
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1883 1884 1885
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1886

D
dragondriver 已提交
1887 1888 1889
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1890
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1891 1892 1893
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1894 1895 1896 1897
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
D
dragondriver 已提交
1898 1899 1900 1901

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1902
			zap.Error(err),
D
dragondriver 已提交
1903
			zap.String("traceID", traceID),
1904
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1905 1906 1907
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1908 1909 1910 1911 1912
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
1913
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1914
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1915
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1916
			metrics.FailLabel).Inc()
1917

B
BossZou 已提交
1918
		return &commonpb.Status{
1919
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1920 1921 1922
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1923 1924 1925 1926

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1927
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1928 1929 1930 1931 1932 1933 1934 1935
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

X
Xiaofan 已提交
1936
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1937
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
1938
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1939
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
1940
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1941 1942 1943
	return dit.result, nil
}

1944 1945
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
C
Cai Yudong 已提交
1946
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1947 1948 1949 1950 1951
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1952 1953 1954 1955 1956

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1957
	gibpt := &getIndexBuildProgressTask{
1958 1959 1960
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1961 1962
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1963
		dataCoord:                    node.dataCoord,
1964 1965
	}

1966
	method := "GetIndexBuildProgress"
1967
	tr := timerecord.NewTimeRecorder(method)
1968 1969 1970 1971

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1972
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1973 1974 1975 1976
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1977 1978 1979 1980 1981 1982

	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1983
			zap.String("role", typeutil.ProxyRole),
1984 1985 1986 1987
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
1988
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
1989
			metrics.AbandonLabel).Inc()
1990

1991 1992 1993 1994 1995 1996 1997 1998
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1999 2000 2001
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2002
		zap.String("role", typeutil.ProxyRole),
2003 2004 2005
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
2006 2007 2008 2009
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2010 2011 2012 2013

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
2014
			zap.Error(err),
2015
			zap.String("traceID", traceID),
2016
			zap.String("role", typeutil.ProxyRole),
2017 2018 2019
			zap.Int64("MsgID", gibpt.ID()),
			zap.Uint64("BeginTs", gibpt.BeginTs()),
			zap.Uint64("EndTs", gibpt.EndTs()),
2020 2021 2022 2023
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
X
Xiaofan 已提交
2024
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2025
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2026
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2027
			metrics.FailLabel).Inc()
2028 2029 2030 2031 2032 2033 2034 2035

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2036 2037 2038 2039

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2040
		zap.String("role", typeutil.ProxyRole),
2041 2042 2043 2044 2045 2046 2047 2048
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName),
		zap.Any("result", gibpt.result))
2049

X
Xiaofan 已提交
2050
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2051
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2052
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2053
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2054
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2055
	return gibpt.result, nil
2056 2057
}

2058
// GetIndexState get the build-state of index.
C
Cai Yudong 已提交
2059
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
2060 2061 2062 2063 2064
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2065 2066 2067 2068 2069

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2070
	dipt := &getIndexStateTask{
G
godchen 已提交
2071 2072 2073
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
2074 2075
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
2076 2077
	}

2078
	method := "GetIndexState"
2079
	tr := timerecord.NewTimeRecorder(method)
2080 2081 2082 2083

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2084
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2085 2086 2087 2088
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2089 2090 2091 2092 2093 2094

	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2095
			zap.String("role", typeutil.ProxyRole),
2096 2097 2098 2099 2100
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2101
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2102
			metrics.AbandonLabel).Inc()
2103

G
godchen 已提交
2104
		return &milvuspb.GetIndexStateResponse{
2105
			Status: &commonpb.Status{
2106
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2107 2108 2109 2110 2111
				Reason:    err.Error(),
			},
		}, nil
	}

2112 2113 2114
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2115
		zap.String("role", typeutil.ProxyRole),
2116 2117 2118
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2119 2120 2121 2122
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2123 2124 2125 2126

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2127
			zap.Error(err),
2128
			zap.String("traceID", traceID),
2129
			zap.String("role", typeutil.ProxyRole),
2130 2131 2132
			zap.Int64("MsgID", dipt.ID()),
			zap.Uint64("BeginTs", dipt.BeginTs()),
			zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2133 2134 2135 2136 2137
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2138
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2139
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2140
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2141
			metrics.FailLabel).Inc()
2142

G
godchen 已提交
2143
		return &milvuspb.GetIndexStateResponse{
2144
			Status: &commonpb.Status{
2145
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2146 2147 2148 2149 2150
				Reason:    err.Error(),
			},
		}, nil
	}

2151 2152 2153
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2154
		zap.String("role", typeutil.ProxyRole),
2155 2156 2157 2158 2159 2160 2161 2162
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

X
Xiaofan 已提交
2163
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2164
		metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2165
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2166
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2167
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2168 2169 2170
	return dipt.result, nil
}

2171
// Insert insert records into collection.
C
Cai Yudong 已提交
2172
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2173 2174 2175 2176 2177 2178
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))

2179 2180 2181 2182 2183
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2184 2185
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2186

2187 2188 2189 2190 2191
	defer func() {
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.TotalLabel).Inc()
	}()

2192
	it := &insertTask{
2193 2194
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2195
		// req:       request,
2196 2197 2198 2199
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2200
			InsertRequest: internalpb.InsertRequest{
2201
				Base: &commonpb.MsgBase{
X
xige-16 已提交
2202 2203
					MsgType:  commonpb.MsgType_Insert,
					MsgID:    0,
X
Xiaofan 已提交
2204
					SourceID: Params.ProxyCfg.GetNodeID(),
2205 2206 2207
				},
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2208 2209 2210
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2211
				// RowData: transfer column based request to this
2212 2213
			},
		},
2214
		rowIDAllocator: node.idAllocator,
2215
		segIDAssigner:  node.segAssigner,
2216
		chMgr:          node.chMgr,
2217
		chTicker:       node.chTicker,
2218
	}
2219 2220

	if len(it.PartitionName) <= 0 {
2221
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2222 2223
	}

X
Xiangyu Wang 已提交
2224
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2225
		numRows := request.NumRows
2226 2227 2228 2229
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2230

X
Xiangyu Wang 已提交
2231 2232 2233 2234 2235 2236 2237
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2238 2239
	}

X
Xiangyu Wang 已提交
2240
	log.Debug("Enqueue insert request in Proxy",
2241
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2242 2243 2244 2245 2246
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2247 2248
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))
D
dragondriver 已提交
2249

X
Xiangyu Wang 已提交
2250 2251
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Debug("Failed to enqueue insert task: " + err.Error())
2252 2253
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2254
		return constructFailedResponse(err), nil
2255
	}
D
dragondriver 已提交
2256

X
Xiangyu Wang 已提交
2257
	log.Debug("Detail of insert request in Proxy",
2258
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2259
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2260 2261 2262 2263 2264
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
X
Xiangyu Wang 已提交
2265 2266 2267 2268 2269
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))

	if err := it.WaitToFinish(); err != nil {
		log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2270
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2271
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2272 2273 2274 2275 2276
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2277
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2289
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2290

2291
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2292
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
2293
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(it.result.InsertCnt))
2294
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2295 2296 2297
	return it.result, nil
}

2298
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2299
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2300 2301 2302
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2303 2304
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
2305

G
groot 已提交
2306 2307 2308 2309 2310 2311
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2312 2313 2314
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

2315 2316
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
2317
	dt := &deleteTask{
X
xige-16 已提交
2318 2319 2320
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
G
godchen 已提交
2321
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2322 2323 2324
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2325 2326 2327 2328 2329
			DeleteRequest: internalpb.DeleteRequest{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_Delete,
					MsgID:   0,
				},
X
xige-16 已提交
2330
				DbName:         request.DbName,
G
godchen 已提交
2331 2332 2333
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2334 2335 2336 2337
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2338 2339
	}

2340
	log.Debug("Enqueue delete request in Proxy",
2341
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2342 2343 2344 2345
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2346 2347 2348 2349

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
		log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
X
Xiaofan 已提交
2350
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2351
			metrics.FailLabel).Inc()
2352

G
groot 已提交
2353 2354 2355 2356 2357 2358 2359 2360
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2361
	log.Debug("Detail of delete request in Proxy",
2362
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2363 2364 2365 2366 2367
		zap.Int64("msgID", dt.Base.MsgID),
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2368 2369
		zap.String("expr", request.Expr),
		zap.String("traceID", traceID))
G
groot 已提交
2370

2371 2372
	if err := dt.WaitToFinish(); err != nil {
		log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
X
Xiaofan 已提交
2373
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2374
			metrics.TotalLabel).Inc()
X
Xiaofan 已提交
2375
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2376
			metrics.FailLabel).Inc()
G
groot 已提交
2377 2378 2379 2380 2381 2382 2383 2384
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

X
Xiaofan 已提交
2385
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
2386
		metrics.SuccessLabel).Inc()
2387
	metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2388 2389 2390
	return dt.result, nil
}

2391
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2392
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2393 2394 2395 2396 2397
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2398 2399
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
2400 2401
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
2402

C
cai.zhang 已提交
2403 2404
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2405 2406
	traceID, _, _ := trace.InfoFromSpan(sp)

2407
	qt := &searchTask{
S
sunby 已提交
2408
		ctx:       ctx,
2409
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2410
		SearchRequest: &internalpb.SearchRequest{
2411
			Base: &commonpb.MsgBase{
2412
				MsgType:  commonpb.MsgType_Search,
X
Xiaofan 已提交
2413
				SourceID: Params.ProxyCfg.GetNodeID(),
2414
			},
2415
			ReqID: Params.ProxyCfg.GetNodeID(),
2416
		},
2417 2418 2419 2420
		request:            request,
		qc:                 node.queryCoord,
		tr:                 timerecord.NewTimeRecorder("search"),
		getQueryNodePolicy: defaultGetQueryNodePolicy,
2421 2422
	}

2423 2424 2425 2426 2427
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

	log.Debug(
		rpcReceived(method),
D
dragondriver 已提交
2428
		zap.String("traceID", traceID),
2429
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2430 2431 2432 2433 2434
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2435 2436 2437 2438
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2439

2440 2441 2442
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
D
dragondriver 已提交
2443 2444
			zap.Error(err),
			zap.String("traceID", traceID),
2445
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2446 2447 2448 2449 2450 2451
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
			zap.Any("OutputFields", request.OutputFields),
2452 2453 2454
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2455

2456 2457
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.AbandonLabel).Inc()
2458

2459 2460
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2461
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2462 2463 2464 2465
				Reason:    err.Error(),
			},
		}, nil
	}
2466
	tr.Record("search request enqueue")
2467

2468 2469
	log.Debug(
		rpcEnqueued(method),
D
dragondriver 已提交
2470
		zap.String("traceID", traceID),
2471
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2472
		zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2473 2474 2475 2476 2477
		zap.Uint64("timestamp", qt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
2478
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2479 2480 2481 2482
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2483

2484 2485 2486
	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2487
			zap.Error(err),
D
dragondriver 已提交
2488
			zap.String("traceID", traceID),
2489
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2490
			zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2491 2492 2493 2494
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
2495
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2496 2497 2498 2499
			zap.Any("OutputFields", request.OutputFields),
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
2500

2501 2502
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
2503

2504 2505
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2506
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2507 2508 2509 2510 2511
				Reason:    err.Error(),
			},
		}, nil
	}

2512 2513 2514
	span := tr.Record("wait search result")
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
		metrics.SearchLabel).Observe(float64(span.Milliseconds()))
2515 2516
	log.Debug(
		rpcDone(method),
D
dragondriver 已提交
2517
		zap.String("traceID", traceID),
2518
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2519 2520 2521 2522 2523 2524
		zap.Int64("msgID", qt.ID()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2525 2526 2527 2528
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2529

2530 2531 2532
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries()))
C
cai.zhang 已提交
2533
	searchDur := tr.ElapseSpan().Milliseconds()
X
Xiaofan 已提交
2534
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
2535
		metrics.SearchLabel).Observe(float64(searchDur))
2536 2537 2538
	return qt.result, nil
}

2539
// Flush notify data nodes to persist the data of collection.
2540 2541 2542 2543 2544 2545 2546
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2547
	if !node.checkHealthy() {
2548 2549
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2550
	}
D
dragondriver 已提交
2551 2552 2553 2554 2555

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2556
	ft := &flushTask{
T
ThreadDao 已提交
2557 2558 2559
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2560
		dataCoord:    node.dataCoord,
2561 2562
	}

D
dragondriver 已提交
2563
	method := "Flush"
2564
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2565
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2566 2567 2568 2569

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2570
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2571 2572
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2573 2574 2575 2576 2577 2578

	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2579
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2580 2581 2582
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2583
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2584

2585 2586
		resp.Status.Reason = err.Error()
		return resp, nil
2587 2588
	}

D
dragondriver 已提交
2589 2590 2591
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2592
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2593 2594 2595
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2596 2597
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2598 2599 2600 2601

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2602
			zap.Error(err),
D
dragondriver 已提交
2603
			zap.String("traceID", traceID),
2604
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2605 2606 2607
			zap.Int64("MsgID", ft.ID()),
			zap.Uint64("BeginTs", ft.BeginTs()),
			zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2608 2609 2610
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2611
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2612

D
dragondriver 已提交
2613
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2614 2615
		resp.Status.Reason = err.Error()
		return resp, nil
2616 2617
	}

D
dragondriver 已提交
2618 2619 2620
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2621
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2622 2623 2624 2625 2626 2627
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))

X
Xiaofan 已提交
2628 2629
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2630
	return ft.result, nil
2631 2632
}

2633
// Query get the records by primary keys.
C
Cai Yudong 已提交
2634
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2635 2636 2637 2638 2639
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2640

D
dragondriver 已提交
2641 2642 2643
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2644
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2645

2646
	qt := &queryTask{
2647 2648 2649 2650 2651
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_Retrieve,
X
Xiaofan 已提交
2652
				SourceID: Params.ProxyCfg.GetNodeID(),
2653
			},
2654
			ReqID: Params.ProxyCfg.GetNodeID(),
2655
		},
2656 2657 2658 2659
		request:            request,
		qc:                 node.queryCoord,
		getQueryNodePolicy: defaultGetQueryNodePolicy,
		queryShardPolicy:   roundRobinPolicy,
2660 2661
	}

D
dragondriver 已提交
2662 2663
	method := "Query"

2664 2665 2666
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.TotalLabel).Inc()

D
dragondriver 已提交
2667 2668 2669
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2670
		zap.String("role", typeutil.ProxyRole),
2671 2672 2673
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
G
godchen 已提交
2674

D
dragondriver 已提交
2675 2676 2677 2678 2679 2680
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
2681 2682 2683
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2684

2685 2686 2687
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()

2688 2689 2690 2691 2692 2693
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2694
	}
2695
	tr.Record("query request enqueue")
2696

D
dragondriver 已提交
2697 2698 2699
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2700
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2701 2702 2703
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2704 2705 2706
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2707 2708 2709 2710 2711 2712

	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2713
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2714 2715 2716
			zap.Int64("MsgID", qt.ID()),
			zap.Uint64("BeginTs", qt.BeginTs()),
			zap.Uint64("EndTs", qt.EndTs()),
2717 2718 2719
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
2720

2721 2722
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
			metrics.FailLabel).Inc()
2723

2724 2725 2726 2727 2728 2729 2730
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2731 2732 2733
	span := tr.Record("wait query result")
	metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
		metrics.QueryLabel).Observe(float64(span.Milliseconds()))
D
dragondriver 已提交
2734 2735 2736 2737 2738 2739 2740
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2741 2742 2743
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2744

2745 2746 2747 2748
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
		metrics.SuccessLabel).Inc()

	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10),
2749
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2750 2751 2752 2753 2754
	return &milvuspb.QueryResults{
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
	}, nil
}
2755

2756
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2757 2758 2759 2760
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2761 2762 2763 2764 2765

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2766 2767 2768 2769 2770 2771 2772
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2773
	method := "CreateAlias"
2774
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2775
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
2795
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
2796

Y
Yusup 已提交
2797 2798 2799 2800 2801 2802
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2803 2804 2805
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2806
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2807 2808 2809 2810
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2811 2812
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2813 2814 2815 2816

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2817
			zap.Error(err),
D
dragondriver 已提交
2818
			zap.String("traceID", traceID),
2819
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2820 2821 2822 2823
			zap.Int64("MsgID", cat.ID()),
			zap.Uint64("BeginTs", cat.BeginTs()),
			zap.Uint64("EndTs", cat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2824 2825
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
X
Xiaofan 已提交
2826
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2827 2828 2829 2830 2831 2832 2833

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
2845 2846
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2847 2848 2849
	return cat.result, nil
}

2850
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2851 2852 2853 2854
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2855 2856 2857 2858 2859

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2860 2861 2862 2863 2864 2865 2866
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2867
	method := "DropAlias"
2868
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2869
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias))
X
Xiaofan 已提交
2886
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2887

Y
Yusup 已提交
2888 2889 2890 2891 2892 2893
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2894 2895 2896
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2897
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2898 2899 2900 2901
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2902
		zap.String("alias", request.Alias))
D
dragondriver 已提交
2903 2904 2905 2906

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2907
			zap.Error(err),
D
dragondriver 已提交
2908
			zap.String("traceID", traceID),
2909
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2910 2911 2912 2913
			zap.Int64("MsgID", dat.ID()),
			zap.Uint64("BeginTs", dat.BeginTs()),
			zap.Uint64("EndTs", dat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2914 2915
			zap.String("alias", request.Alias))

X
Xiaofan 已提交
2916
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
2917

Y
Yusup 已提交
2918 2919 2920 2921 2922 2923
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

X
Xiaofan 已提交
2934 2935
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2936 2937 2938
	return dat.result, nil
}

2939
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2940 2941 2942 2943
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2944 2945 2946 2947 2948

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2949 2950 2951 2952 2953 2954 2955
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2956
	method := "AlterAlias"
2957
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
2958
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
X
Xiaofan 已提交
2977
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2978

Y
Yusup 已提交
2979 2980 2981 2982 2983 2984
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2985 2986 2987
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2988
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2989 2990 2991 2992
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2993 2994
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2995 2996 2997 2998

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2999
			zap.Error(err),
D
dragondriver 已提交
3000
			zap.String("traceID", traceID),
3001
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3002 3003 3004 3005
			zap.Int64("MsgID", aat.ID()),
			zap.Uint64("BeginTs", aat.BeginTs()),
			zap.Uint64("EndTs", aat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
3006 3007 3008
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
3009
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc()
3010

Y
Yusup 已提交
3011 3012 3013 3014 3015 3016
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

X
Xiaofan 已提交
3028 3029
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3030 3031 3032
	return aat.result, nil
}

3033
// CalcDistance calculates the distances between vectors.
3034
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
3035 3036 3037 3038 3039
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
3040
	param, _ := funcutil.GetAttrByKeyFromRepeatedKV("metric", request.GetParams())
3041 3042 3043 3044 3045 3046 3047 3048
	metric, err := distance.ValidateMetricType(param)
	if err != nil {
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
3049 3050
	}

3051 3052 3053 3054
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

3055 3056
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
3057

3058 3059 3060 3061 3062
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
3063 3064
		}

3065
		qt := &queryTask{
3066 3067 3068 3069 3070
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Retrieve,
X
Xiaofan 已提交
3071
					SourceID: Params.ProxyCfg.GetNodeID(),
3072
				},
3073
				ReqID: Params.ProxyCfg.GetNodeID(),
3074
			},
3075 3076 3077 3078 3079 3080
			request: queryRequest,
			qc:      node.queryCoord,
			ids:     ids.IdArray,

			getQueryNodePolicy: defaultGetQueryNodePolicy,
			queryShardPolicy:   roundRobinPolicy,
3081 3082
		}

3083
		err := node.sched.dqQueue.Enqueue(qt)
3084
		if err != nil {
3085 3086 3087
			log.Debug("CalcDistance queryTask failed to enqueue",
				zap.Error(err),
				zap.String("traceID", traceID),
3088
				zap.String("role", typeutil.ProxyRole),
3089 3090 3091 3092
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames))

3093 3094 3095 3096 3097
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3098
			}, err
3099
		}
3100 3101 3102

		log.Debug("CalcDistance queryTask enqueued",
			zap.String("traceID", traceID),
3103
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3104 3105 3106 3107 3108 3109
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3110 3111 3112 3113

		err = qt.WaitToFinish()
		if err != nil {
			log.Debug("CalcDistance queryTask failed to WaitToFinish",
G
godchen 已提交
3114
				zap.Error(err),
3115
				zap.String("traceID", traceID),
3116
				zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3117 3118 3119 3120 3121 3122
				zap.Int64("msgID", qt.Base.MsgID),
				zap.Uint64("timestamp", qt.Base.Timestamp),
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames),
				zap.Any("OutputFields", queryRequest.OutputFields))
3123 3124 3125 3126 3127 3128

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3129
			}, err
3130
		}
3131 3132 3133

		log.Debug("CalcDistance queryTask Done",
			zap.String("traceID", traceID),
3134
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3135 3136 3137 3138 3139 3140
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3141 3142

		return &milvuspb.QueryResults{
3143 3144
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
3145 3146 3147
		}, nil
	}

3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
	// the vectors retrieved are random order, we need re-arrange the vectors by the order of input ids
	arrangeFunc := func(ids *milvuspb.VectorIDs, retrievedFields []*schemapb.FieldData) (*schemapb.VectorField, error) {
		var retrievedIds *schemapb.ScalarField
		var retrievedVectors *schemapb.VectorField
		for _, fieldData := range retrievedFields {
			if fieldData.FieldName == ids.FieldName {
				retrievedVectors = fieldData.GetVectors()
			}
			if fieldData.Type == schemapb.DataType_Int64 {
				retrievedIds = fieldData.GetScalars()
			}
		}

		if retrievedIds == nil || retrievedVectors == nil {
3162
			return nil, errors.New("failed to fetch vectors")
3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178
		}

		dict := make(map[int64]int)
		for index, id := range retrievedIds.GetLongData().Data {
			dict[id] = index
		}

		inputIds := ids.IdArray.GetIntId().Data
		if retrievedVectors.GetFloatVector() != nil {
			floatArr := retrievedVectors.GetFloatVector().Data
			element := retrievedVectors.GetDim()
			result := make([]float32, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3179
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205
				}
				result = append(result, floatArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element,
				Data: &schemapb.VectorField_FloatVector{
					FloatVector: &schemapb.FloatArray{
						Data: result,
					},
				},
			}, nil
		}

		if retrievedVectors.GetBinaryVector() != nil {
			binaryArr := retrievedVectors.GetBinaryVector()
			element := retrievedVectors.GetDim()
			if element%8 != 0 {
				element = element + 8 - element%8
			}

			result := make([]byte, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3206
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218
				}
				result = append(result, binaryArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element * 8,
				Data: &schemapb.VectorField_BinaryVector{
					BinaryVector: result,
				},
			}, nil
		}

3219
		return nil, errors.New("failed to fetch vectors")
3220 3221
	}

3222 3223
	log.Debug("CalcDistance received",
		zap.String("traceID", traceID),
3224
		zap.String("role", typeutil.ProxyRole),
3225
		zap.String("metric", metric))
G
godchen 已提交
3226

3227 3228 3229
	vectorsLeft := request.GetOpLeft().GetDataArray()
	opLeft := request.GetOpLeft().GetIdArray()
	if opLeft != nil {
3230 3231
		log.Debug("OpLeft IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3232
			zap.String("role", typeutil.ProxyRole))
3233

3234
		result, err := query(opLeft)
3235
		if err != nil {
3236 3237 3238
			log.Debug("Failed to get left vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3239
				zap.String("role", typeutil.ProxyRole))
3240

3241 3242 3243 3244 3245 3246 3247 3248
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3249 3250
		log.Debug("OpLeft IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3251
			zap.String("role", typeutil.ProxyRole))
3252

3253 3254
		vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData)
		if err != nil {
3255 3256 3257
			log.Debug("Failed to re-arrange left vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3258
				zap.String("role", typeutil.ProxyRole))
3259

3260 3261 3262 3263 3264 3265
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3266
		}
3267 3268 3269

		log.Debug("Re-arrange left vectors done",
			zap.String("traceID", traceID),
3270
			zap.String("role", typeutil.ProxyRole))
3271 3272
	}

G
groot 已提交
3273
	if vectorsLeft == nil {
3274 3275 3276
		msg := "Left vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3277
			zap.String("role", typeutil.ProxyRole))
3278

G
groot 已提交
3279 3280 3281
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3282
				Reason:    msg,
G
groot 已提交
3283 3284 3285 3286
			},
		}, nil
	}

3287 3288 3289
	vectorsRight := request.GetOpRight().GetDataArray()
	opRight := request.GetOpRight().GetIdArray()
	if opRight != nil {
3290 3291
		log.Debug("OpRight IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3292
			zap.String("role", typeutil.ProxyRole))
3293

3294
		result, err := query(opRight)
3295
		if err != nil {
3296 3297 3298
			log.Debug("Failed to get right vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3299
				zap.String("role", typeutil.ProxyRole))
3300

3301 3302 3303 3304 3305 3306 3307 3308
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3309 3310
		log.Debug("OpRight IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3311
			zap.String("role", typeutil.ProxyRole))
3312

3313 3314
		vectorsRight, err = arrangeFunc(opRight, result.FieldsData)
		if err != nil {
3315 3316 3317
			log.Debug("Failed to re-arrange right vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3318
				zap.String("role", typeutil.ProxyRole))
3319

3320 3321 3322 3323 3324 3325
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3326
		}
3327 3328 3329

		log.Debug("Re-arrange right vectors done",
			zap.String("traceID", traceID),
3330
			zap.String("role", typeutil.ProxyRole))
3331 3332
	}

G
groot 已提交
3333
	if vectorsRight == nil {
3334 3335 3336
		msg := "Right vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3337
			zap.String("role", typeutil.ProxyRole))
3338

G
groot 已提交
3339 3340 3341
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3342
				Reason:    msg,
G
groot 已提交
3343 3344 3345 3346
			},
		}, nil
	}

3347
	if vectorsLeft.Dim != vectorsRight.Dim {
3348 3349 3350
		msg := "Vectors dimension is not equal"
		log.Debug(msg,
			zap.String("traceID", traceID),
3351
			zap.String("role", typeutil.ProxyRole))
3352

3353 3354 3355
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3356
				Reason:    msg,
3357 3358 3359 3360 3361
			},
		}, nil
	}

	if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil {
3362 3363
		distances, err := distance.CalcFloatDistance(vectorsLeft.Dim, vectorsLeft.GetFloatVector().Data, vectorsRight.GetFloatVector().Data, metric)
		if err != nil {
3364 3365 3366
			log.Debug("Failed to CalcFloatDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3367
				zap.String("role", typeutil.ProxyRole))
3368

3369 3370 3371 3372 3373 3374 3375 3376
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3377 3378 3379
		log.Debug("CalcFloatDistance done",
			zap.Error(err),
			zap.String("traceID", traceID),
3380
			zap.String("role", typeutil.ProxyRole))
3381

3382 3383 3384 3385 3386 3387 3388 3389 3390 3391
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
			Array: &milvuspb.CalcDistanceResults_FloatDist{
				FloatDist: &schemapb.FloatArray{
					Data: distances,
				},
			},
		}, nil
	}

3392
	if vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetBinaryVector() != nil {
G
groot 已提交
3393
		hamming, err := distance.CalcHammingDistance(vectorsLeft.Dim, vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector())
3394
		if err != nil {
3395 3396 3397
			log.Debug("Failed to CalcHammingDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3398
				zap.String("role", typeutil.ProxyRole))
3399

3400 3401 3402 3403 3404 3405 3406 3407 3408
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		if metric == distance.HAMMING {
3409 3410
			log.Debug("CalcHammingDistance done",
				zap.String("traceID", traceID),
3411
				zap.String("role", typeutil.ProxyRole))
3412

3413 3414 3415 3416
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_IntDist{
					IntDist: &schemapb.IntArray{
G
groot 已提交
3417
						Data: hamming,
3418 3419 3420 3421 3422 3423
					},
				},
			}, nil
		}

		if metric == distance.TANIMOTO {
G
groot 已提交
3424
			tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.Dim, hamming)
3425
			if err != nil {
3426 3427 3428
				log.Debug("Failed to CalcTanimotoCoefficient",
					zap.Error(err),
					zap.String("traceID", traceID),
3429
					zap.String("role", typeutil.ProxyRole))
3430

3431 3432 3433 3434 3435 3436 3437 3438
				return &milvuspb.CalcDistanceResults{
					Status: &commonpb.Status{
						ErrorCode: commonpb.ErrorCode_UnexpectedError,
						Reason:    err.Error(),
					},
				}, nil
			}

3439 3440
			log.Debug("CalcTanimotoCoefficient done",
				zap.String("traceID", traceID),
3441
				zap.String("role", typeutil.ProxyRole))
3442

3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_FloatDist{
					FloatDist: &schemapb.FloatArray{
						Data: tanimoto,
					},
				},
			}, nil
		}
	}

3454
	err = errors.New("unexpected error")
3455
	if (vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetFloatVector() != nil) || (vectorsLeft.GetFloatVector() != nil && vectorsRight.GetBinaryVector() != nil) {
3456
		err = errors.New("cannot calculate distance between binary vectors and float vectors")
3457 3458
	}

3459 3460 3461
	log.Debug("Failed to CalcDistance",
		zap.Error(err),
		zap.String("traceID", traceID),
3462
		zap.String("role", typeutil.ProxyRole))
3463

3464 3465 3466 3467 3468 3469 3470 3471
	return &milvuspb.CalcDistanceResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		},
	}, nil
}

3472
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
3473
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
3474 3475
	panic("implement me")
}
X
XuanYang-cn 已提交
3476

3477
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
3478
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
D
dragondriver 已提交
3479
	log.Debug("GetPersistentSegmentInfo",
3480
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3481 3482 3483
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3484
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
3485
		Status: &commonpb.Status{
3486
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
3487 3488
		},
	}
3489 3490 3491 3492
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3493 3494
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
X
Xiaofan 已提交
3495
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
3496
		metrics.TotalLabel).Inc()
G
godchen 已提交
3497
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
X
XuanYang-cn 已提交
3498
	if err != nil {
3499
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3500 3501
		return resp, nil
	}
3502
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
X
XuanYang-cn 已提交
3503
		Base: &commonpb.MsgBase{
3504
			MsgType:   commonpb.MsgType_SegmentInfo,
X
XuanYang-cn 已提交
3505 3506
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3507
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3508 3509 3510 3511
		},
		SegmentIDs: segments,
	})
	if err != nil {
3512
		log.Debug("GetPersistentSegmentInfo fail", zap.Error(err))
3513
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3514 3515
		return resp, nil
	}
3516
	log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status))
3517
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3518 3519 3520 3521 3522 3523
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3524
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3525 3526
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3527
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3528 3529 3530
			State:        info.State,
		}
	}
X
Xiaofan 已提交
3531
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
3532
		metrics.SuccessLabel).Inc()
X
Xiaofan 已提交
3533
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3534
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3535 3536 3537 3538
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3539
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3540
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
D
dragondriver 已提交
3541
	log.Debug("GetQuerySegmentInfo",
3542
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3543 3544 3545
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3546
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3547
		Status: &commonpb.Status{
3548
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3549 3550
		},
	}
3551 3552 3553 3554
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
G
godchen 已提交
3555
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
Z
zhenshan.cao 已提交
3556 3557 3558 3559
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3560 3561 3562 3563 3564
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3565
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Z
zhenshan.cao 已提交
3566
		Base: &commonpb.MsgBase{
3567
			MsgType:   commonpb.MsgType_SegmentInfo,
Z
zhenshan.cao 已提交
3568 3569
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3570
			SourceID:  Params.ProxyCfg.GetNodeID(),
Z
zhenshan.cao 已提交
3571
		},
3572 3573
		CollectionID: collID,
		SegmentIDs:   segments,
Z
zhenshan.cao 已提交
3574 3575
	})
	if err != nil {
3576
		log.Error("Failed to get segment info from QueryCoord",
3577
			zap.Int64s("segmentIDs", segments), zap.Error(err))
Z
zhenshan.cao 已提交
3578 3579 3580
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3581
	log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
3582
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
3583
		log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
X
xige-16 已提交
3597
			State:        info.SegmentState,
3598
			NodeIds:      info.NodeIds,
Z
zhenshan.cao 已提交
3599 3600
		}
	}
3601
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3602 3603 3604 3605
	resp.Infos = queryInfos
	return resp, nil
}

C
Cai Yudong 已提交
3606
func (node *Proxy) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
3607
	describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
X
XuanYang-cn 已提交
3608
		Base: &commonpb.MsgBase{
3609
			MsgType:   commonpb.MsgType_DescribeCollection,
X
XuanYang-cn 已提交
3610 3611
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3612
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3613 3614 3615 3616 3617 3618 3619
		},
		DbName:         dbName,
		CollectionName: collectionName,
	})
	if err != nil {
		return nil, err
	}
3620
	if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3621 3622 3623
		return nil, errors.New(describeCollectionResponse.Status.Reason)
	}
	collectionID := describeCollectionResponse.CollectionID
3624
	showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
X
XuanYang-cn 已提交
3625
		Base: &commonpb.MsgBase{
3626
			MsgType:   commonpb.MsgType_ShowPartitions,
X
XuanYang-cn 已提交
3627 3628
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3629
			SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3630 3631 3632 3633 3634 3635 3636 3637
		},
		DbName:         dbName,
		CollectionName: collectionName,
		CollectionID:   collectionID,
	})
	if err != nil {
		return nil, err
	}
3638
	if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3639 3640 3641 3642 3643
		return nil, errors.New(showPartitionsResp.Status.Reason)
	}

	ret := make([]UniqueID, 0)
	for _, partitionID := range showPartitionsResp.PartitionIDs {
3644
		showSegmentResponse, err := node.rootCoord.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
X
XuanYang-cn 已提交
3645
			Base: &commonpb.MsgBase{
3646
				MsgType:   commonpb.MsgType_ShowSegments,
X
XuanYang-cn 已提交
3647 3648
				MsgID:     0,
				Timestamp: 0,
X
Xiaofan 已提交
3649
				SourceID:  Params.ProxyCfg.GetNodeID(),
X
XuanYang-cn 已提交
3650 3651 3652 3653 3654 3655 3656
			},
			CollectionID: collectionID,
			PartitionID:  partitionID,
		})
		if err != nil {
			return nil, err
		}
3657
		if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3658 3659 3660 3661 3662 3663
			return nil, errors.New(showSegmentResponse.Status.Reason)
		}
		ret = append(ret, showSegmentResponse.SegmentIDs...)
	}
	return ret, nil
}
3664

J
jingkl 已提交
3665
// Dummy handles dummy request
C
Cai Yudong 已提交
3666
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
	if err != nil {
		log.Debug("Failed to parse dummy request type")
		return failedResponse, nil
	}

3678 3679
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3680
		if err != nil {
3681
			log.Debug("Failed to parse dummy query request")
3682 3683 3684
			return failedResponse, nil
		}

3685
		request := &milvuspb.QueryRequest{
3686 3687 3688
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3689
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3690 3691
		}

3692
		_, err = node.Query(ctx, request)
3693
		if err != nil {
3694
			log.Debug("Failed to execute dummy query")
3695 3696
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3697 3698 3699 3700 3701 3702

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3703 3704
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3705 3706
}

J
jingkl 已提交
3707
// RegisterLink registers a link
C
Cai Yudong 已提交
3708
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
G
godchen 已提交
3709
	code := node.stateCode.Load().(internalpb.StateCode)
D
dragondriver 已提交
3710
	log.Debug("RegisterLink",
3711
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3712
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3713

G
godchen 已提交
3714
	if code != internalpb.StateCode_Healthy {
3715 3716 3717
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3718
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3719
				Reason:    "proxy not healthy",
3720 3721 3722
			},
		}, nil
	}
X
Xiaofan 已提交
3723
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Inc()
3724 3725 3726
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3727
			ErrorCode: commonpb.ErrorCode_Success,
3728
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3729 3730 3731
		},
	}, nil
}
3732

3733
// GetMetrics gets the metrics of proxy
3734 3735 3736
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("Proxy.GetMetrics",
X
Xiaofan 已提交
3737
		zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3738 3739 3740 3741
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
X
Xiaofan 已提交
3742
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3743
			zap.String("req", req.Request),
X
Xiaofan 已提交
3744
			zap.Error(errProxyIsUnhealthy(Params.ProxyCfg.GetNodeID())))
3745 3746 3747 3748

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
3749
				Reason:    msgProxyIsUnhealthy(Params.ProxyCfg.GetNodeID()),
3750 3751 3752 3753 3754 3755 3756 3757
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
X
Xiaofan 已提交
3758
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

D
dragondriver 已提交
3774 3775 3776 3777 3778 3779 3780 3781 3782 3783
	msgID := UniqueID(0)
	msgID, err = node.idAllocator.AllocOne()
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to allocate id",
			zap.Error(err))
	}
	req.Base = &commonpb.MsgBase{
		MsgType:   commonpb.MsgType_SystemInfo,
		MsgID:     msgID,
		Timestamp: 0,
X
Xiaofan 已提交
3784
		SourceID:  Params.ProxyCfg.GetNodeID(),
D
dragondriver 已提交
3785 3786
	}

3787
	if metricType == metricsinfo.SystemInfoMetrics {
3788 3789 3790 3791 3792 3793 3794
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3795
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3796 3797

		log.Debug("Proxy.GetMetrics",
X
Xiaofan 已提交
3798
			zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3799 3800 3801 3802 3803
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3804 3805
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3806
		return metrics, nil
3807 3808 3809
	}

	log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
X
Xiaofan 已提交
3810
		zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

B
bigsheeper 已提交
3823 3824 3825
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("Proxy.LoadBalance",
X
Xiaofan 已提交
3826
		zap.Int64("proxy_id", Params.ProxyCfg.GetNodeID()),
B
bigsheeper 已提交
3827 3828 3829 3830 3831 3832 3833 3834 3835
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
3836 3837 3838 3839 3840 3841 3842

	collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
		log.Error("failed to get collection id", zap.String("collection name", req.GetCollectionName()), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
B
bigsheeper 已提交
3843 3844 3845 3846 3847
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_LoadBalanceSegments,
			MsgID:     0,
			Timestamp: 0,
X
Xiaofan 已提交
3848
			SourceID:  Params.ProxyCfg.GetNodeID(),
B
bigsheeper 已提交
3849 3850 3851
		},
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3852
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3853
		SealedSegmentIDs: req.SealedSegmentIDs,
3854
		CollectionID:     collectionID,
B
bigsheeper 已提交
3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871
	})
	if err != nil {
		log.Error("Failed to LoadBalance from Query Coordinator",
			zap.Any("req", req), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
		log.Error("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason))
		status.Reason = infoResp.Reason
		return status, nil
	}
	log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp))
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

J
jingkl 已提交
3872
//GetCompactionState gets the compaction state of multiple segments
3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
	log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
	log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3886
// ManualCompaction invokes compaction on specified collection
3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
	log.Info("received ManualCompaction request", zap.Int64("collectionID", req.GetCollectionID()))
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
	log.Info("received ManualCompaction response", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3900
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
	log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

B
Bingyi Sun 已提交
3914 3915 3916
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	log.Info("received get flush state request", zap.Any("request", req))
3917
	var err error
B
Bingyi Sun 已提交
3918 3919 3920 3921 3922 3923 3924
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		log.Info("unable to get flush state because of closed server")
		return resp, nil
	}

3925
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3926 3927 3928 3929
	if err != nil {
		log.Info("failed to get flush state response", zap.Error(err))
		return nil, err
	}
B
Bingyi Sun 已提交
3930 3931 3932 3933
	log.Info("received get flush state response", zap.Any("response", resp))
	return resp, err
}

C
Cai Yudong 已提交
3934 3935
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3936 3937 3938 3939
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

J
jingkl 已提交
3940
//unhealthyStatus returns the proxy not healthy status
3941 3942 3943
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3944
		Reason:    "proxy not healthy",
3945 3946
	}
}
G
groot 已提交
3947 3948 3949

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
3950 3951 3952
	log.Info("received import request",
		zap.String("collection name", req.GetCollectionName()),
		zap.Bool("row-based", req.GetRowBased()))
3953 3954 3955 3956 3957 3958
	resp := &milvuspb.ImportResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}
G
groot 已提交
3959 3960 3961 3962
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974
	// Get collection ID and then channel names.
	collID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName())
	if err != nil {
		log.Error("collection ID not found",
			zap.String("collection name", req.GetCollectionName()),
			zap.Error(err))
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		resp.Status.Reason = err.Error()
		return resp, err
	}
	chNames, err := node.chMgr.getVChannels(collID)
	if err != nil {
3975 3976 3977 3978 3979 3980 3981 3982
		err = node.chMgr.createDMLMsgStream(collID)
		if err != nil {
			return nil, err
		}
		chNames, err = node.chMgr.getVChannels(collID)
		if err != nil {
			return nil, err
		}
3983 3984
	}
	req.ChannelNames = chNames
3985 3986 3987
	if req.GetPartitionName() == "" {
		req.PartitionName = Params.CommonCfg.DefaultPartitionName
	}
3988 3989
	// Call rootCoord to finish import.
	resp, err = node.rootCoord.Import(ctx, req)
G
groot 已提交
3990
	return resp, err
G
groot 已提交
3991 3992
}

G
groot 已提交
3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020
// GetImportState checks import task state from datanode
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
	log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.rootCoord.GetImportState(ctx, req)
	log.Info("received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
	log.Info("received list import tasks request")
	resp := &milvuspb.ListImportTasksResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.rootCoord.ListImportTasks(ctx, req)
	log.Info("received list import tasks response")
	return resp, err
}

X
XuanYang-cn 已提交
4021 4022 4023 4024 4025 4026 4027 4028 4029
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
	log.Info("received get replicas request")
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

4030 4031
	req.Base = &commonpb.MsgBase{
		MsgType:  commonpb.MsgType_GetReplicas,
X
Xiaofan 已提交
4032
		SourceID: Params.ProxyCfg.GetNodeID(),
4033 4034
	}

X
XuanYang-cn 已提交
4035 4036 4037 4038 4039
	resp, err := node.queryCoord.GetReplicas(ctx, req)
	log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149
// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	credInfo := &internalpb.CredentialInfo{
		Username:          request.Username,
		EncryptedPassword: request.Password,
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) ClearCredUsersCache(ctx context.Context, request *internalpb.ClearCredUsersCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	if globalMetaCache != nil {
		globalMetaCache.ClearCredUsers() // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
	log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
		log.Error("create credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

C
codeman 已提交
4150
func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.UpdateCredentialRequest) (*commonpb.Status, error) {
4151
	log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
C
codeman 已提交
4152 4153 4154 4155 4156 4157 4158 4159 4160
	rawOldPassword, err := crypto.Base64Decode(req.OldPassword)
	if err != nil {
		log.Error("decode old password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode old password fail when updating:" + req.Username,
		}, nil
	}
	rawNewPassword, err := crypto.Base64Decode(req.NewPassword)
4161 4162 4163 4164 4165 4166 4167
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
4168 4169
	// valid new password
	if err = ValidatePassword(rawNewPassword); err != nil {
4170 4171 4172 4173 4174 4175
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
C
codeman 已提交
4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192
	// check old password is correct
	oldCredInfo, err := globalMetaCache.GetCredentialInfo(ctx, req.Username)
	if err != nil {
		log.Error("found no credential", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "found no credential:" + req.Username,
		}, nil
	}
	if !crypto.PasswordVerify(rawOldPassword, oldCredInfo.EncryptedPassword) {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "old password is not correct:" + req.Username,
		}, nil
	}
	// update meta data
	encryptedPassword, err := crypto.PasswordEncrypt(rawNewPassword)
4193 4194 4195 4196 4197 4198 4199
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
C
codeman 已提交
4200
	updateCredReq := &internalpb.CredentialInfo{
4201 4202 4203
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
C
codeman 已提交
4204
	result, err := node.rootCoord.UpdateCredential(ctx, updateCredReq)
4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216
	if err != nil { // for error like conntext timeout etc.
		log.Error("update credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
	log.Debug("DeleteCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
4217 4218 4219 4220 4221 4222
	if req.Username == util.UserRoot {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_DeleteCredentialFailure,
			Reason:    "user root cannot be deleted",
		}, nil
	}
4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
		log.Error("delete credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
	log.Debug("ListCredUsers", zap.String("role", typeutil.RootCoordRole))
	// get from cache
	usernames, err := globalMetaCache.GetCredUsernames(ctx)
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Usernames: usernames,
	}, nil
}
4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268

// SendSearchResult needs to be removed TODO
func (node *Proxy) SendSearchResult(ctx context.Context, req *internalpb.SearchResults) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
		Reason:    "Not implemented",
	}, nil
}

// SendRetrieveResult needs to be removed TODO
func (node *Proxy) SendRetrieveResult(ctx context.Context, req *internalpb.RetrieveResults) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
		Reason:    "Not implemented",
	}, nil
}