impl.go 140.6 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19 20

import (
	"context"
21
	"errors"
22
	"fmt"
C
cai.zhang 已提交
23
	"os"
24 25
	"strconv"

26
	"go.uber.org/zap"
S
sunby 已提交
27

28
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
29
	"github.com/milvus-io/milvus/internal/log"
30
	"github.com/milvus-io/milvus/internal/metrics"
J
jaime 已提交
31
	"github.com/milvus-io/milvus/internal/mq/msgstream"
X
Xiangyu Wang 已提交
32 33 34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
38
	"github.com/milvus-io/milvus/internal/proto/schemapb"
39
	"github.com/milvus-io/milvus/internal/util/crypto"
40
	"github.com/milvus-io/milvus/internal/util/distance"
41 42 43
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/logutil"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
44
	"github.com/milvus-io/milvus/internal/util/timerecord"
45
	"github.com/milvus-io/milvus/internal/util/trace"
X
Xiangyu Wang 已提交
46
	"github.com/milvus-io/milvus/internal/util/typeutil"
47 48
)

49 50
const moduleName = "Proxy"

51
// UpdateStateCode updates the state code of Proxy.
C
Cai Yudong 已提交
52
func (node *Proxy) UpdateStateCode(code internalpb.StateCode) {
53
	node.stateCode.Store(code)
Z
zhenshan.cao 已提交
54 55
}

56
// GetComponentStates get state of Proxy.
C
Cai Yudong 已提交
57
func (node *Proxy) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
58 59 60 61 62 63 64 65 66 67 68 69
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    errMsg,
		}
G
godchen 已提交
70
		return stats, nil
G
godchen 已提交
71
	}
72 73 74 75
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
	}
G
godchen 已提交
76
	info := &internalpb.ComponentInfo{
77 78
		// NodeID:    Params.ProxyID, // will race with Proxy.Register()
		NodeID:    nodeID,
C
Cai Yudong 已提交
79
		Role:      typeutil.ProxyRole,
G
godchen 已提交
80 81 82 83 84 85
		StateCode: code,
	}
	stats.State = info
	return stats, nil
}

C
cxytz01 已提交
86
// GetStatisticsChannel gets statistics channel of Proxy.
C
Cai Yudong 已提交
87
func (node *Proxy) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
88 89 90 91 92 93 94 95 96
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: "",
	}, nil
}

97
// InvalidateCollectionMetaCache invalidate the meta cache of specific collection.
C
Cai Yudong 已提交
98
func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
99 100
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate collection meta cache",
101
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
102 103 104
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

105
	collectionName := request.CollectionName
N
neza2017 已提交
106 107 108
	if globalMetaCache != nil {
		globalMetaCache.RemoveCollection(ctx, collectionName) // no need to return error, though collection may be not cached
	}
109
	logutil.Logger(ctx).Debug("complete to invalidate collection meta cache",
110
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
111 112 113
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

114
	return &commonpb.Status{
115
		ErrorCode: commonpb.ErrorCode_Success,
116 117
		Reason:    "",
	}, nil
118 119
}

120
// ReleaseDQLMessageStream release the query message stream of specific collection.
C
Cai Yudong 已提交
121
func (node *Proxy) ReleaseDQLMessageStream(ctx context.Context, request *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
122 123
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to release DQL message strem",
124
		zap.Any("role", typeutil.ProxyRole),
125 126 127
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

128 129 130 131
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

132 133
	_ = node.chMgr.removeDQLStream(request.CollectionID)

134
	logutil.Logger(ctx).Debug("complete to release DQL message stream",
135
		zap.Any("role", typeutil.ProxyRole),
136 137 138 139 140 141 142 143 144
		zap.Any("db", request.DbID),
		zap.Any("collection", request.CollectionID))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

145
// TODO(dragondriver): add more detailed ut for ConsistencyLevel, should we support multiple consistency level in Proxy?
146
// CreateCollection create a collection by the schema.
C
Cai Yudong 已提交
147
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
148 149 150
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
151 152 153 154

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
155 156 157 158
	method := "CreateCollection"
	tr := timerecord.NewTimeRecorder(method)

	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
159

160
	cct := &createCollectionTask{
S
sunby 已提交
161
		ctx:                     ctx,
162 163
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
164
		rootCoord:               node.rootCoord,
165 166
	}

167 168 169
	// avoid data race
	lenOfSchema := len(request.Schema)

170 171
	log.Debug(
		rpcReceived(method),
172
		zap.String("traceID", traceID),
173
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
174 175
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
176
		zap.Int("len(schema)", lenOfSchema),
177 178
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
179

180 181 182
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
183 184
			zap.Error(err),
			zap.String("traceID", traceID),
185
			zap.String("role", typeutil.ProxyRole),
186 187 188
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Int("len(schema)", lenOfSchema),
189 190
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
191

192
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
193
		return &commonpb.Status{
194
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
195 196 197 198
			Reason:    err.Error(),
		}, nil
	}

199 200
	log.Debug(
		rpcEnqueued(method),
201
		zap.String("traceID", traceID),
202
		zap.String("role", typeutil.ProxyRole),
203 204 205
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
206 207
		zap.Uint64("timestamp", request.Base.Timestamp),
		zap.String("db", request.DbName),
208 209
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
210 211
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
212

213 214 215
	if err := cct.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
216
			zap.Error(err),
217
			zap.String("traceID", traceID),
218
			zap.String("role", typeutil.ProxyRole),
219 220 221
			zap.Int64("MsgID", cct.ID()),
			zap.Uint64("BeginTs", cct.BeginTs()),
			zap.Uint64("EndTs", cct.EndTs()),
D
dragondriver 已提交
222 223
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
224
			zap.Int("len(schema)", lenOfSchema),
225 226
			zap.Int32("shards_num", request.ShardsNum),
			zap.String("consistency_level", request.ConsistencyLevel.String()))
D
dragondriver 已提交
227

228
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
229
		return &commonpb.Status{
230
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
231 232 233 234
			Reason:    err.Error(),
		}, nil
	}

235 236
	log.Debug(
		rpcDone(method),
237
		zap.String("traceID", traceID),
238
		zap.String("role", typeutil.ProxyRole),
239 240 241 242 243 244
		zap.Int64("MsgID", cct.ID()),
		zap.Uint64("BeginTs", cct.BeginTs()),
		zap.Uint64("EndTs", cct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Int("len(schema)", lenOfSchema),
245 246
		zap.Int32("shards_num", request.ShardsNum),
		zap.String("consistency_level", request.ConsistencyLevel.String()))
247

248 249
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
250 251 252
	return cct.result, nil
}

253
// DropCollection drop a collection.
C
Cai Yudong 已提交
254
func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
255 256 257
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
258 259 260 261

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
262 263 264
	method := "DropCollection"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
265

266
	dct := &dropCollectionTask{
S
sunby 已提交
267
		ctx:                   ctx,
268 269
		Condition:             NewTaskCondition(ctx),
		DropCollectionRequest: request,
270
		rootCoord:             node.rootCoord,
271
		chMgr:                 node.chMgr,
S
sunby 已提交
272
		chTicker:              node.chTicker,
273 274
	}

275 276
	log.Debug("DropCollection received",
		zap.String("traceID", traceID),
277
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
278 279
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
280 281 282 283 284

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DropCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
285
			zap.String("role", typeutil.ProxyRole),
286 287 288
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

289
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
290
		return &commonpb.Status{
291
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
292 293 294 295
			Reason:    err.Error(),
		}, nil
	}

296 297
	log.Debug("DropCollection enqueued",
		zap.String("traceID", traceID),
298
		zap.String("role", typeutil.ProxyRole),
299 300 301
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
302 303
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
304 305 306

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DropCollection failed to WaitToFinish",
D
dragondriver 已提交
307
			zap.Error(err),
308
			zap.String("traceID", traceID),
309
			zap.String("role", typeutil.ProxyRole),
310 311 312
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTs", dct.BeginTs()),
			zap.Uint64("EndTs", dct.EndTs()),
D
dragondriver 已提交
313 314 315
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

316
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
317
		return &commonpb.Status{
318
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
319 320 321 322
			Reason:    err.Error(),
		}, nil
	}

323 324
	log.Debug("DropCollection done",
		zap.String("traceID", traceID),
325
		zap.String("role", typeutil.ProxyRole),
326 327 328 329 330 331
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTs", dct.BeginTs()),
		zap.Uint64("EndTs", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

332 333
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
334 335 336
	return dct.result, nil
}

337
// HasCollection check if the specific collection exists in Milvus.
C
Cai Yudong 已提交
338
func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
339 340 341 342 343
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
344 345 346 347

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
348 349 350
	method := "HasCollection"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
351
		metrics.TotalLabel).Inc()
352 353 354

	log.Debug("HasCollection received",
		zap.String("traceID", traceID),
355
		zap.String("role", typeutil.ProxyRole),
356 357 358
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

359
	hct := &hasCollectionTask{
S
sunby 已提交
360
		ctx:                  ctx,
361 362
		Condition:            NewTaskCondition(ctx),
		HasCollectionRequest: request,
363
		rootCoord:            node.rootCoord,
364 365
	}

366 367 368 369
	if err := node.sched.ddQueue.Enqueue(hct); err != nil {
		log.Warn("HasCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
370
			zap.String("role", typeutil.ProxyRole),
371 372 373
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

374
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
375
			metrics.AbandonLabel).Inc()
376 377
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
378
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
379 380 381 382 383
				Reason:    err.Error(),
			},
		}, nil
	}

384 385
	log.Debug("HasCollection enqueued",
		zap.String("traceID", traceID),
386
		zap.String("role", typeutil.ProxyRole),
387 388 389
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
390 391
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
392 393 394

	if err := hct.WaitToFinish(); err != nil {
		log.Warn("HasCollection failed to WaitToFinish",
D
dragondriver 已提交
395
			zap.Error(err),
396
			zap.String("traceID", traceID),
397
			zap.String("role", typeutil.ProxyRole),
398 399 400
			zap.Int64("MsgID", hct.ID()),
			zap.Uint64("BeginTS", hct.BeginTs()),
			zap.Uint64("EndTS", hct.EndTs()),
D
dragondriver 已提交
401 402 403
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

404
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
405
			metrics.FailLabel).Inc()
406 407
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
408
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
409 410 411 412 413
				Reason:    err.Error(),
			},
		}, nil
	}

414 415
	log.Debug("HasCollection done",
		zap.String("traceID", traceID),
416
		zap.String("role", typeutil.ProxyRole),
417 418 419 420 421 422
		zap.Int64("MsgID", hct.ID()),
		zap.Uint64("BeginTS", hct.BeginTs()),
		zap.Uint64("EndTS", hct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

423
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
424 425
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
426 427 428
	return hct.result, nil
}

429
// LoadCollection load a collection into query nodes.
C
Cai Yudong 已提交
430
func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
431 432 433
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
434 435 436 437

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadCollection")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
438 439
	method := "LoadCollection"
	tr := timerecord.NewTimeRecorder(method)
440

441
	lct := &loadCollectionTask{
S
sunby 已提交
442
		ctx:                   ctx,
443 444
		Condition:             NewTaskCondition(ctx),
		LoadCollectionRequest: request,
445
		queryCoord:            node.queryCoord,
446 447
	}

448 449
	log.Debug("LoadCollection received",
		zap.String("traceID", traceID),
450
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
451 452
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
453 454 455 456 457

	if err := node.sched.ddQueue.Enqueue(lct); err != nil {
		log.Warn("LoadCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
458
			zap.String("role", typeutil.ProxyRole),
459 460 461
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

462
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
463
			metrics.AbandonLabel).Inc()
464
		return &commonpb.Status{
465
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
466 467 468
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
469

470 471
	log.Debug("LoadCollection enqueued",
		zap.String("traceID", traceID),
472
		zap.String("role", typeutil.ProxyRole),
473 474 475
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
476 477
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
478 479 480

	if err := lct.WaitToFinish(); err != nil {
		log.Warn("LoadCollection failed to WaitToFinish",
D
dragondriver 已提交
481
			zap.Error(err),
482
			zap.String("traceID", traceID),
483
			zap.String("role", typeutil.ProxyRole),
484 485 486
			zap.Int64("MsgID", lct.ID()),
			zap.Uint64("BeginTS", lct.BeginTs()),
			zap.Uint64("EndTS", lct.EndTs()),
D
dragondriver 已提交
487 488 489
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

490
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
491
			metrics.TotalLabel).Inc()
492
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
493
			metrics.FailLabel).Inc()
494
		return &commonpb.Status{
495
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
496 497 498 499
			Reason:    err.Error(),
		}, nil
	}

500 501
	log.Debug("LoadCollection done",
		zap.String("traceID", traceID),
502
		zap.String("role", typeutil.ProxyRole),
503 504 505 506 507 508
		zap.Int64("MsgID", lct.ID()),
		zap.Uint64("BeginTS", lct.BeginTs()),
		zap.Uint64("EndTS", lct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

509
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
510
		metrics.TotalLabel).Inc()
511
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
512 513
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
514
	return lct.result, nil
515 516
}

517
// ReleaseCollection remove the loaded collection from query nodes.
C
Cai Yudong 已提交
518
func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
519 520 521
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
522

523
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleaseCollection")
524 525
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
526 527
	method := "ReleaseCollection"
	tr := timerecord.NewTimeRecorder(method)
528

529
	rct := &releaseCollectionTask{
S
sunby 已提交
530
		ctx:                      ctx,
531 532
		Condition:                NewTaskCondition(ctx),
		ReleaseCollectionRequest: request,
533
		queryCoord:               node.queryCoord,
534
		chMgr:                    node.chMgr,
535 536
	}

537 538
	log.Debug(
		rpcReceived(method),
539
		zap.String("traceID", traceID),
540
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
541 542
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
543 544

	if err := node.sched.ddQueue.Enqueue(rct); err != nil {
545 546
		log.Warn(
			rpcFailedToEnqueue(method),
547 548
			zap.Error(err),
			zap.String("traceID", traceID),
549
			zap.String("role", typeutil.ProxyRole),
550 551 552
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

553
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
554
			metrics.AbandonLabel).Inc()
555
		return &commonpb.Status{
556
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
557 558 559 560
			Reason:    err.Error(),
		}, nil
	}

561 562
	log.Debug(
		rpcEnqueued(method),
563
		zap.String("traceID", traceID),
564
		zap.String("role", typeutil.ProxyRole),
565 566 567
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
568 569
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
570 571

	if err := rct.WaitToFinish(); err != nil {
572 573
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
574
			zap.Error(err),
575
			zap.String("traceID", traceID),
576
			zap.String("role", typeutil.ProxyRole),
577 578 579
			zap.Int64("MsgID", rct.ID()),
			zap.Uint64("BeginTS", rct.BeginTs()),
			zap.Uint64("EndTS", rct.EndTs()),
D
dragondriver 已提交
580 581 582
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

583
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
584
			metrics.TotalLabel).Inc()
585
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
586
			metrics.FailLabel).Inc()
587
		return &commonpb.Status{
588
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
589 590 591 592
			Reason:    err.Error(),
		}, nil
	}

593 594
	log.Debug(
		rpcDone(method),
595
		zap.String("traceID", traceID),
596
		zap.String("role", typeutil.ProxyRole),
597 598 599 600 601 602
		zap.Int64("MsgID", rct.ID()),
		zap.Uint64("BeginTS", rct.BeginTs()),
		zap.Uint64("EndTS", rct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

603
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
604
		metrics.TotalLabel).Inc()
605
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
606 607
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
608
	return rct.result, nil
609 610
}

611
// DescribeCollection get the meta information of specific collection, such as schema, created timestamp and etc.
C
Cai Yudong 已提交
612
func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
613 614 615 616 617
	if !node.checkHealthy() {
		return &milvuspb.DescribeCollectionResponse{
			Status: unhealthyStatus(),
		}, nil
	}
618

619
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeCollection")
620 621
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
622 623
	method := "DescribeCollection"
	tr := timerecord.NewTimeRecorder(method)
624

625
	dct := &describeCollectionTask{
S
sunby 已提交
626
		ctx:                       ctx,
627 628
		Condition:                 NewTaskCondition(ctx),
		DescribeCollectionRequest: request,
629
		rootCoord:                 node.rootCoord,
630 631
	}

632 633
	log.Debug("DescribeCollection received",
		zap.String("traceID", traceID),
634
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
635 636
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
637 638 639 640 641

	if err := node.sched.ddQueue.Enqueue(dct); err != nil {
		log.Warn("DescribeCollection failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
642
			zap.String("role", typeutil.ProxyRole),
643 644 645
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

646
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
647
			metrics.AbandonLabel).Inc()
648 649
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
650
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
651 652 653 654 655
				Reason:    err.Error(),
			},
		}, nil
	}

656 657
	log.Debug("DescribeCollection enqueued",
		zap.String("traceID", traceID),
658
		zap.String("role", typeutil.ProxyRole),
659 660 661
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
662 663
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
664 665 666

	if err := dct.WaitToFinish(); err != nil {
		log.Warn("DescribeCollection failed to WaitToFinish",
D
dragondriver 已提交
667
			zap.Error(err),
668
			zap.String("traceID", traceID),
669
			zap.String("role", typeutil.ProxyRole),
670 671 672
			zap.Int64("MsgID", dct.ID()),
			zap.Uint64("BeginTS", dct.BeginTs()),
			zap.Uint64("EndTS", dct.EndTs()),
D
dragondriver 已提交
673 674 675
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

676
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
677
			metrics.TotalLabel).Inc()
678
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
679
			metrics.FailLabel).Inc()
680

681 682
		return &milvuspb.DescribeCollectionResponse{
			Status: &commonpb.Status{
683
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
684 685 686 687 688
				Reason:    err.Error(),
			},
		}, nil
	}

689 690
	log.Debug("DescribeCollection done",
		zap.String("traceID", traceID),
691
		zap.String("role", typeutil.ProxyRole),
692 693 694 695 696 697
		zap.Int64("MsgID", dct.ID()),
		zap.Uint64("BeginTS", dct.BeginTs()),
		zap.Uint64("EndTS", dct.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

698
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
699
		metrics.TotalLabel).Inc()
700
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
701 702
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
703 704 705
	return dct.result, nil
}

706
// GetCollectionStatistics get the collection statistics, such as `num_rows`.
C
Cai Yudong 已提交
707
func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
708 709 710 711 712
	if !node.checkHealthy() {
		return &milvuspb.GetCollectionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
713 714 715 716

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetCollectionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
717 718
	method := "GetCollectionStatistics"
	tr := timerecord.NewTimeRecorder(method)
719

720
	g := &getCollectionStatisticsTask{
G
godchen 已提交
721 722 723
		ctx:                            ctx,
		Condition:                      NewTaskCondition(ctx),
		GetCollectionStatisticsRequest: request,
724
		dataCoord:                      node.dataCoord,
725 726
	}

727 728
	log.Debug("GetCollectionStatistics received",
		zap.String("traceID", traceID),
729
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
730 731
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
732 733 734 735 736

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn("GetCollectionStatistics failed to enqueue",
			zap.Error(err),
			zap.String("traceID", traceID),
737
			zap.String("role", typeutil.ProxyRole),
738 739 740
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

741
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
742
			metrics.AbandonLabel).Inc()
743

G
godchen 已提交
744
		return &milvuspb.GetCollectionStatisticsResponse{
745
			Status: &commonpb.Status{
746
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
747 748 749 750 751
				Reason:    err.Error(),
			},
		}, nil
	}

752 753
	log.Debug("GetCollectionStatistics enqueued",
		zap.String("traceID", traceID),
754
		zap.String("role", typeutil.ProxyRole),
755 756 757
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
758 759
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))
760 761 762

	if err := g.WaitToFinish(); err != nil {
		log.Warn("GetCollectionStatistics failed to WaitToFinish",
D
dragondriver 已提交
763
			zap.Error(err),
764
			zap.String("traceID", traceID),
765
			zap.String("role", typeutil.ProxyRole),
766 767 768
			zap.Int64("MsgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
D
dragondriver 已提交
769 770 771
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName))

772
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
773
			metrics.TotalLabel).Inc()
774
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
775
			metrics.FailLabel).Inc()
776

G
godchen 已提交
777
		return &milvuspb.GetCollectionStatisticsResponse{
778
			Status: &commonpb.Status{
779
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
780 781 782 783 784
				Reason:    err.Error(),
			},
		}, nil
	}

785 786
	log.Debug("GetCollectionStatistics done",
		zap.String("traceID", traceID),
787
		zap.String("role", typeutil.ProxyRole),
788 789 790 791 792 793
		zap.Int64("MsgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName))

794
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
795
		metrics.TotalLabel).Inc()
796
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
797 798
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
799
	return g.result, nil
800 801
}

802
// ShowCollections list all collections in Milvus.
C
Cai Yudong 已提交
803
func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
804 805 806 807 808
	if !node.checkHealthy() {
		return &milvuspb.ShowCollectionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
809 810 811 812
	method := "ShowCollections"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()

813
	sct := &showCollectionsTask{
G
godchen 已提交
814 815 816
		ctx:                    ctx,
		Condition:              NewTaskCondition(ctx),
		ShowCollectionsRequest: request,
817
		queryCoord:             node.queryCoord,
818
		rootCoord:              node.rootCoord,
819 820
	}

821
	log.Debug("ShowCollections received",
822
		zap.String("role", typeutil.ProxyRole),
823 824 825 826 827 828
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
		zap.Any("CollectionNames", request.CollectionNames),
	)

829
	err := node.sched.ddQueue.Enqueue(sct)
830
	if err != nil {
831 832
		log.Warn("ShowCollections failed to enqueue",
			zap.Error(err),
833
			zap.String("role", typeutil.ProxyRole),
834 835 836 837 838 839
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

840
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
G
godchen 已提交
841
		return &milvuspb.ShowCollectionsResponse{
842
			Status: &commonpb.Status{
843
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
844 845 846 847 848
				Reason:    err.Error(),
			},
		}, nil
	}

849
	log.Debug("ShowCollections enqueued",
850
		zap.String("role", typeutil.ProxyRole),
851
		zap.Int64("MsgID", sct.ID()),
852
		zap.String("DbName", sct.ShowCollectionsRequest.DbName),
853
		zap.Uint64("TimeStamp", request.TimeStamp),
854 855 856
		zap.String("ShowType", sct.ShowCollectionsRequest.Type.String()),
		zap.Any("CollectionNames", sct.ShowCollectionsRequest.CollectionNames),
	)
D
dragondriver 已提交
857

858 859
	err = sct.WaitToFinish()
	if err != nil {
860 861
		log.Warn("ShowCollections failed to WaitToFinish",
			zap.Error(err),
862
			zap.String("role", typeutil.ProxyRole),
863 864 865 866 867 868 869
			zap.Int64("MsgID", sct.ID()),
			zap.String("DbName", request.DbName),
			zap.Uint64("TimeStamp", request.TimeStamp),
			zap.String("ShowType", request.Type.String()),
			zap.Any("CollectionNames", request.CollectionNames),
		)

870 871
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

G
godchen 已提交
872
		return &milvuspb.ShowCollectionsResponse{
873
			Status: &commonpb.Status{
874
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
875 876 877 878 879
				Reason:    err.Error(),
			},
		}, nil
	}

880
	log.Debug("ShowCollections Done",
881
		zap.String("role", typeutil.ProxyRole),
882 883 884 885
		zap.Int64("MsgID", sct.ID()),
		zap.String("DbName", request.DbName),
		zap.Uint64("TimeStamp", request.TimeStamp),
		zap.String("ShowType", request.Type.String()),
886 887
		zap.Int("len(CollectionNames)", len(request.CollectionNames)),
		zap.Int("num_collections", len(sct.result.CollectionNames)))
888

889 890
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
891 892 893
	return sct.result, nil
}

894
// CreatePartition create a partition in specific collection.
C
Cai Yudong 已提交
895
func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
896 897 898
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
899

900
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreatePartition")
901 902
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
903 904 905
	method := "CreatePartition"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
906

907
	cpt := &createPartitionTask{
S
sunby 已提交
908
		ctx:                    ctx,
909 910
		Condition:              NewTaskCondition(ctx),
		CreatePartitionRequest: request,
911
		rootCoord:              node.rootCoord,
912 913 914
		result:                 nil,
	}

915 916 917
	log.Debug(
		rpcReceived("CreatePartition"),
		zap.String("traceID", traceID),
918
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
919 920 921
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
922 923 924 925 926 927

	if err := node.sched.ddQueue.Enqueue(cpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue("CreatePartition"),
			zap.Error(err),
			zap.String("traceID", traceID),
928
			zap.String("role", typeutil.ProxyRole),
929 930 931 932
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

933 934
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

935
		return &commonpb.Status{
936
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
937 938 939
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
940

941 942 943
	log.Debug(
		rpcEnqueued("CreatePartition"),
		zap.String("traceID", traceID),
944
		zap.String("role", typeutil.ProxyRole),
945 946 947
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
948 949 950
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
951 952 953 954

	if err := cpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish("CreatePartition"),
D
dragondriver 已提交
955
			zap.Error(err),
956
			zap.String("traceID", traceID),
957
			zap.String("role", typeutil.ProxyRole),
958 959 960
			zap.Int64("MsgID", cpt.ID()),
			zap.Uint64("BeginTS", cpt.BeginTs()),
			zap.Uint64("EndTS", cpt.EndTs()),
D
dragondriver 已提交
961 962 963 964
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

965 966
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

967
		return &commonpb.Status{
968
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
969 970 971
			Reason:    err.Error(),
		}, nil
	}
972 973 974 975

	log.Debug(
		rpcDone("CreatePartition"),
		zap.String("traceID", traceID),
976
		zap.String("role", typeutil.ProxyRole),
977 978 979 980 981 982 983
		zap.Int64("MsgID", cpt.ID()),
		zap.Uint64("BeginTS", cpt.BeginTs()),
		zap.Uint64("EndTS", cpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

984 985
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
986 987 988
	return cpt.result, nil
}

989
// DropPartition drop a partition in specific collection.
C
Cai Yudong 已提交
990
func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
991 992 993
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
994

995
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropPartition")
996 997
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
998 999 1000
	method := "DropPartition"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
1001

1002
	dpt := &dropPartitionTask{
S
sunby 已提交
1003
		ctx:                  ctx,
1004 1005
		Condition:            NewTaskCondition(ctx),
		DropPartitionRequest: request,
1006
		rootCoord:            node.rootCoord,
1007 1008 1009
		result:               nil,
	}

1010 1011 1012
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1013
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1014 1015 1016
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1017 1018 1019 1020 1021 1022

	if err := node.sched.ddQueue.Enqueue(dpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1023
			zap.String("role", typeutil.ProxyRole),
1024 1025 1026 1027
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1028 1029
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

1030
		return &commonpb.Status{
1031
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1032 1033 1034
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1035

1036 1037 1038
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1039
		zap.String("role", typeutil.ProxyRole),
1040 1041 1042
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1043 1044 1045
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1046 1047 1048 1049

	if err := dpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1050
			zap.Error(err),
1051
			zap.String("traceID", traceID),
1052
			zap.String("role", typeutil.ProxyRole),
1053 1054 1055
			zap.Int64("MsgID", dpt.ID()),
			zap.Uint64("BeginTS", dpt.BeginTs()),
			zap.Uint64("EndTS", dpt.EndTs()),
D
dragondriver 已提交
1056 1057 1058 1059
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1060 1061
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

1062
		return &commonpb.Status{
1063
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1064 1065 1066
			Reason:    err.Error(),
		}, nil
	}
1067 1068 1069 1070

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1071
		zap.String("role", typeutil.ProxyRole),
1072 1073 1074 1075 1076 1077 1078
		zap.Int64("MsgID", dpt.ID()),
		zap.Uint64("BeginTS", dpt.BeginTs()),
		zap.Uint64("EndTS", dpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1079 1080
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1081 1082 1083
	return dpt.result, nil
}

1084
// HasPartition check if partition exist.
C
Cai Yudong 已提交
1085
func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
1086 1087 1088 1089 1090
	if !node.checkHealthy() {
		return &milvuspb.BoolResponse{
			Status: unhealthyStatus(),
		}, nil
	}
D
dragondriver 已提交
1091

D
dragondriver 已提交
1092
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-HasPartition")
D
dragondriver 已提交
1093 1094
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1095 1096 1097 1098
	method := "HasPartition"
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1099
		metrics.TotalLabel).Inc()
D
dragondriver 已提交
1100

1101
	hpt := &hasPartitionTask{
S
sunby 已提交
1102
		ctx:                 ctx,
1103 1104
		Condition:           NewTaskCondition(ctx),
		HasPartitionRequest: request,
1105
		rootCoord:           node.rootCoord,
1106 1107 1108
		result:              nil,
	}

D
dragondriver 已提交
1109 1110 1111
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1112
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1113 1114 1115
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1116 1117 1118 1119 1120 1121

	if err := node.sched.ddQueue.Enqueue(hpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1122
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1123 1124 1125 1126
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1127
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1128
			metrics.AbandonLabel).Inc()
1129

1130 1131
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1132
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1133 1134 1135 1136 1137
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1138

D
dragondriver 已提交
1139 1140 1141
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1142
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1143 1144 1145
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1146 1147 1148
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
D
dragondriver 已提交
1149 1150 1151 1152

	if err := hpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1153
			zap.Error(err),
D
dragondriver 已提交
1154
			zap.String("traceID", traceID),
1155
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1156 1157 1158
			zap.Int64("MsgID", hpt.ID()),
			zap.Uint64("BeginTS", hpt.BeginTs()),
			zap.Uint64("EndTS", hpt.EndTs()),
D
dragondriver 已提交
1159 1160 1161 1162
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1163
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1164
			metrics.FailLabel).Inc()
1165

1166 1167
		return &milvuspb.BoolResponse{
			Status: &commonpb.Status{
1168
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1169 1170 1171 1172 1173
				Reason:    err.Error(),
			},
			Value: false,
		}, nil
	}
D
dragondriver 已提交
1174 1175 1176 1177

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1178
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1179 1180 1181 1182 1183 1184 1185
		zap.Int64("MsgID", hpt.ID()),
		zap.Uint64("BeginTS", hpt.BeginTs()),
		zap.Uint64("EndTS", hpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1186
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1187 1188
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1189 1190 1191
	return hpt.result, nil
}

1192
// LoadPartitions load specific partitions into query nodes.
C
Cai Yudong 已提交
1193
func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
1194 1195 1196
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1197

D
dragondriver 已提交
1198
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-LoadPartitions")
1199 1200
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
1201 1202
	method := "LoadPartitions"
	tr := timerecord.NewTimeRecorder(method)
1203

1204
	lpt := &loadPartitionsTask{
G
godchen 已提交
1205 1206 1207
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		LoadPartitionsRequest: request,
1208
		queryCoord:            node.queryCoord,
1209 1210
	}

1211 1212 1213
	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1214
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1215 1216 1217
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1218 1219 1220 1221 1222 1223

	if err := node.sched.ddQueue.Enqueue(lpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1224
			zap.String("role", typeutil.ProxyRole),
1225 1226 1227 1228
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1229
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1230
			metrics.AbandonLabel).Inc()
1231

1232
		return &commonpb.Status{
1233
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1234 1235 1236 1237
			Reason:    err.Error(),
		}, nil
	}

1238 1239 1240
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1241
		zap.String("role", typeutil.ProxyRole),
1242 1243 1244
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1245 1246 1247
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1248 1249 1250 1251

	if err := lpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1252
			zap.Error(err),
1253
			zap.String("traceID", traceID),
1254
			zap.String("role", typeutil.ProxyRole),
1255 1256 1257
			zap.Int64("MsgID", lpt.ID()),
			zap.Uint64("BeginTS", lpt.BeginTs()),
			zap.Uint64("EndTS", lpt.EndTs()),
D
dragondriver 已提交
1258 1259 1260 1261
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1262
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1263
			metrics.TotalLabel).Inc()
1264
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1265
			metrics.FailLabel).Inc()
1266

1267
		return &commonpb.Status{
1268
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1269 1270 1271 1272
			Reason:    err.Error(),
		}, nil
	}

1273 1274 1275
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1276
		zap.String("role", typeutil.ProxyRole),
1277 1278 1279 1280 1281 1282 1283
		zap.Int64("MsgID", lpt.ID()),
		zap.Uint64("BeginTS", lpt.BeginTs()),
		zap.Uint64("EndTS", lpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

1284
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1285
		metrics.TotalLabel).Inc()
1286
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1287 1288
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1289
	return lpt.result, nil
1290 1291
}

1292
// ReleasePartitions release specific partitions from query nodes.
C
Cai Yudong 已提交
1293
func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
1294 1295 1296
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
1297 1298 1299 1300 1301

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ReleasePartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1302
	rpt := &releasePartitionsTask{
G
godchen 已提交
1303 1304 1305
		ctx:                      ctx,
		Condition:                NewTaskCondition(ctx),
		ReleasePartitionsRequest: request,
1306
		queryCoord:               node.queryCoord,
1307 1308
	}

1309
	method := "ReleasePartitions"
1310
	tr := timerecord.NewTimeRecorder(method)
1311 1312 1313 1314

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1315
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1316 1317 1318
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1319 1320 1321 1322 1323 1324

	if err := node.sched.ddQueue.Enqueue(rpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1325
			zap.String("role", typeutil.ProxyRole),
1326 1327 1328 1329
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1330
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1331
			metrics.AbandonLabel).Inc()
1332

1333
		return &commonpb.Status{
1334
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1335 1336 1337 1338
			Reason:    err.Error(),
		}, nil
	}

1339 1340 1341
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1342
		zap.String("role", typeutil.ProxyRole),
1343 1344 1345
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1346 1347 1348
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
1349 1350 1351 1352

	if err := rpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1353
			zap.Error(err),
1354
			zap.String("traceID", traceID),
1355
			zap.String("role", typeutil.ProxyRole),
1356 1357 1358
			zap.Int64("msgID", rpt.Base.MsgID),
			zap.Uint64("BeginTS", rpt.BeginTs()),
			zap.Uint64("EndTS", rpt.EndTs()),
D
dragondriver 已提交
1359 1360 1361 1362
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))

1363
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1364
			metrics.TotalLabel).Inc()
1365
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1366
			metrics.FailLabel).Inc()
1367

1368
		return &commonpb.Status{
1369
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1370 1371 1372 1373
			Reason:    err.Error(),
		}, nil
	}

1374 1375 1376
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1377
		zap.String("role", typeutil.ProxyRole),
1378 1379 1380 1381 1382 1383 1384
		zap.Int64("msgID", rpt.Base.MsgID),
		zap.Uint64("BeginTS", rpt.BeginTs()),
		zap.Uint64("EndTS", rpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))

1385
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1386
		metrics.TotalLabel).Inc()
1387
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1388 1389
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1390
	return rpt.result, nil
1391 1392
}

1393
// GetPartitionStatistics get the statistics of partition, such as num_rows.
C
Cai Yudong 已提交
1394
func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
1395 1396 1397 1398 1399
	if !node.checkHealthy() {
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1400 1401 1402 1403 1404

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetPartitionStatistics")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1405
	g := &getPartitionStatisticsTask{
1406 1407 1408
		ctx:                           ctx,
		Condition:                     NewTaskCondition(ctx),
		GetPartitionStatisticsRequest: request,
1409
		dataCoord:                     node.dataCoord,
1410 1411
	}

1412
	method := "GetPartitionStatistics"
1413
	tr := timerecord.NewTimeRecorder(method)
1414 1415 1416 1417

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1418
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1419 1420 1421
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1422 1423 1424 1425 1426 1427

	if err := node.sched.ddQueue.Enqueue(g); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1428
			zap.String("role", typeutil.ProxyRole),
1429 1430 1431 1432
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1433
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1434
			metrics.AbandonLabel).Inc()
1435

1436 1437 1438 1439 1440 1441 1442 1443
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1444 1445 1446
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1447
		zap.String("role", typeutil.ProxyRole),
1448 1449 1450
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
1451 1452 1453
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))
1454 1455 1456 1457

	if err := g.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
1458
			zap.Error(err),
1459
			zap.String("traceID", traceID),
1460
			zap.String("role", typeutil.ProxyRole),
1461 1462 1463
			zap.Int64("msgID", g.ID()),
			zap.Uint64("BeginTS", g.BeginTs()),
			zap.Uint64("EndTS", g.EndTs()),
1464 1465 1466 1467
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("partition", request.PartitionName))

1468
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1469
			metrics.TotalLabel).Inc()
1470
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1471
			metrics.FailLabel).Inc()
1472

1473 1474 1475 1476 1477 1478 1479 1480
		return &milvuspb.GetPartitionStatisticsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1481 1482 1483
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1484
		zap.String("role", typeutil.ProxyRole),
1485 1486 1487 1488 1489 1490 1491
		zap.Int64("msgID", g.ID()),
		zap.Uint64("BeginTS", g.BeginTs()),
		zap.Uint64("EndTS", g.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName))

1492
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1493
		metrics.TotalLabel).Inc()
1494
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1495 1496
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1497
	return g.result, nil
1498 1499
}

1500
// ShowPartitions list all partitions in the specific collection.
C
Cai Yudong 已提交
1501
func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
1502 1503 1504 1505 1506
	if !node.checkHealthy() {
		return &milvuspb.ShowPartitionsResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1507 1508 1509 1510 1511

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1512
	spt := &showPartitionsTask{
G
godchen 已提交
1513 1514 1515
		ctx:                   ctx,
		Condition:             NewTaskCondition(ctx),
		ShowPartitionsRequest: request,
1516
		rootCoord:             node.rootCoord,
1517
		queryCoord:            node.queryCoord,
G
godchen 已提交
1518
		result:                nil,
1519 1520
	}

1521
	method := "ShowPartitions"
1522 1523 1524
	tr := timerecord.NewTimeRecorder(method)
	//TODO: use collectionID instead of collectionName
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1525
		metrics.TotalLabel).Inc()
1526 1527 1528 1529

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1530
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1531
		zap.Any("request", request))
1532 1533 1534 1535 1536 1537

	if err := node.sched.ddQueue.Enqueue(spt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1538
			zap.String("role", typeutil.ProxyRole),
1539 1540
			zap.Any("request", request))

1541
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1542
			metrics.AbandonLabel).Inc()
1543

G
godchen 已提交
1544
		return &milvuspb.ShowPartitionsResponse{
1545
			Status: &commonpb.Status{
1546
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1547 1548 1549 1550 1551
				Reason:    err.Error(),
			},
		}, nil
	}

1552 1553 1554
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1555
		zap.String("role", typeutil.ProxyRole),
1556 1557 1558
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
1559 1560
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
1561 1562 1563 1564 1565
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

	if err := spt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1566
			zap.Error(err),
1567
			zap.String("traceID", traceID),
1568
			zap.String("role", typeutil.ProxyRole),
1569 1570 1571 1572 1573 1574
			zap.Int64("msgID", spt.ID()),
			zap.Uint64("BeginTS", spt.BeginTs()),
			zap.Uint64("EndTS", spt.EndTs()),
			zap.String("db", spt.ShowPartitionsRequest.DbName),
			zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
			zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))
D
dragondriver 已提交
1575

1576
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1577
			metrics.FailLabel).Inc()
1578

G
godchen 已提交
1579
		return &milvuspb.ShowPartitionsResponse{
1580
			Status: &commonpb.Status{
1581
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1582 1583 1584 1585
				Reason:    err.Error(),
			},
		}, nil
	}
1586 1587 1588 1589

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1590
		zap.String("role", typeutil.ProxyRole),
1591 1592 1593 1594 1595 1596 1597
		zap.Int64("msgID", spt.ID()),
		zap.Uint64("BeginTS", spt.BeginTs()),
		zap.Uint64("EndTS", spt.EndTs()),
		zap.String("db", spt.ShowPartitionsRequest.DbName),
		zap.String("collection", spt.ShowPartitionsRequest.CollectionName),
		zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames))

1598
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1599 1600
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1601 1602 1603
	return spt.result, nil
}

1604
// CreateIndex create index for collection.
C
Cai Yudong 已提交
1605
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
1606 1607 1608
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1609 1610 1611 1612 1613

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1614
	cit := &createIndexTask{
S
sunby 已提交
1615
		ctx:                ctx,
1616 1617
		Condition:          NewTaskCondition(ctx),
		CreateIndexRequest: request,
1618
		rootCoord:          node.rootCoord,
1619 1620
	}

D
dragondriver 已提交
1621
	method := "CreateIndex"
1622
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1623 1624 1625 1626

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1627
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1628 1629 1630 1631
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1632 1633 1634 1635 1636 1637

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1638
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1639 1640 1641 1642 1643
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

1644
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1645
			metrics.AbandonLabel).Inc()
1646

1647
		return &commonpb.Status{
1648
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1649 1650 1651 1652
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1653 1654 1655
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1656
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1657 1658 1659
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1660 1661 1662 1663
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))
D
dragondriver 已提交
1664 1665 1666 1667

	if err := cit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1668
			zap.Error(err),
D
dragondriver 已提交
1669
			zap.String("traceID", traceID),
1670
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1671 1672 1673
			zap.Int64("MsgID", cit.ID()),
			zap.Uint64("BeginTs", cit.BeginTs()),
			zap.Uint64("EndTs", cit.EndTs()),
D
dragondriver 已提交
1674 1675 1676 1677 1678
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.Any("extra_params", request.ExtraParams))

1679
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1680
			metrics.TotalLabel).Inc()
1681
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1682
			metrics.FailLabel).Inc()
1683

1684
		return &commonpb.Status{
1685
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
1686 1687 1688 1689
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
1690 1691 1692
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1693
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1694 1695 1696 1697 1698 1699 1700 1701
		zap.Int64("MsgID", cit.ID()),
		zap.Uint64("BeginTs", cit.BeginTs()),
		zap.Uint64("EndTs", cit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.Any("extra_params", request.ExtraParams))

1702
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1703
		metrics.TotalLabel).Inc()
1704
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1705 1706
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1707 1708 1709
	return cit.result, nil
}

1710
// DescribeIndex get the meta information of index, such as index state, index id and etc.
C
Cai Yudong 已提交
1711
func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
1712 1713 1714 1715 1716
	if !node.checkHealthy() {
		return &milvuspb.DescribeIndexResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1717 1718 1719 1720 1721

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DescribeIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1722
	dit := &describeIndexTask{
S
sunby 已提交
1723
		ctx:                  ctx,
1724 1725
		Condition:            NewTaskCondition(ctx),
		DescribeIndexRequest: request,
1726
		rootCoord:            node.rootCoord,
1727 1728
	}

1729 1730 1731
	method := "DescribeIndex"
	// avoid data race
	indexName := request.IndexName
1732
	tr := timerecord.NewTimeRecorder(method)
1733 1734 1735 1736

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1737
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1738 1739 1740
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1741 1742 1743 1744 1745 1746 1747
		zap.String("index name", indexName))

	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1748
			zap.String("role", typeutil.ProxyRole),
1749 1750 1751 1752 1753
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", indexName))

1754
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1755
			metrics.AbandonLabel).Inc()
1756

1757 1758
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
1759
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
1760 1761 1762 1763 1764
				Reason:    err.Error(),
			},
		}, nil
	}

1765 1766 1767
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1768
		zap.String("role", typeutil.ProxyRole),
1769 1770 1771
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1772 1773 1774
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
1775 1776 1777 1778 1779
		zap.String("index name", indexName))

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1780
			zap.Error(err),
1781
			zap.String("traceID", traceID),
1782
			zap.String("role", typeutil.ProxyRole),
1783 1784 1785
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1786 1787 1788
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
1789
			zap.String("index name", indexName))
D
dragondriver 已提交
1790

Z
zhenshan.cao 已提交
1791 1792 1793 1794
		errCode := commonpb.ErrorCode_UnexpectedError
		if dit.result != nil {
			errCode = dit.result.Status.GetErrorCode()
		}
1795
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1796
			metrics.TotalLabel).Inc()
1797
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1798
			metrics.FailLabel).Inc()
1799

1800 1801
		return &milvuspb.DescribeIndexResponse{
			Status: &commonpb.Status{
Z
zhenshan.cao 已提交
1802
				ErrorCode: errCode,
1803 1804 1805 1806 1807
				Reason:    err.Error(),
			},
		}, nil
	}

1808 1809 1810
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1811
		zap.String("role", typeutil.ProxyRole),
1812 1813 1814 1815 1816 1817 1818 1819
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", indexName))

1820
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1821
		metrics.TotalLabel).Inc()
1822
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1823 1824
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
1825 1826 1827
	return dit.result, nil
}

1828
// DropIndex drop the index of collection.
C
Cai Yudong 已提交
1829
func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
1830 1831 1832
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
1833 1834 1835 1836 1837

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropIndex")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1838
	dit := &dropIndexTask{
S
sunby 已提交
1839
		ctx:              ctx,
B
BossZou 已提交
1840 1841
		Condition:        NewTaskCondition(ctx),
		DropIndexRequest: request,
1842
		rootCoord:        node.rootCoord,
B
BossZou 已提交
1843
	}
G
godchen 已提交
1844

D
dragondriver 已提交
1845
	method := "DropIndex"
1846
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
1847 1848 1849 1850

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1851
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1852 1853 1854 1855 1856
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

D
dragondriver 已提交
1857 1858 1859 1860 1861
	if err := node.sched.ddQueue.Enqueue(dit); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1862
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1863 1864 1865 1866
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
1867
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1868
			metrics.AbandonLabel).Inc()
D
dragondriver 已提交
1869

B
BossZou 已提交
1870
		return &commonpb.Status{
1871
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1872 1873 1874
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1875

D
dragondriver 已提交
1876 1877 1878
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1879
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1880 1881 1882
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1883 1884 1885 1886
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
D
dragondriver 已提交
1887 1888 1889 1890

	if err := dit.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
1891
			zap.Error(err),
D
dragondriver 已提交
1892
			zap.String("traceID", traceID),
1893
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1894 1895 1896
			zap.Int64("MsgID", dit.ID()),
			zap.Uint64("BeginTs", dit.BeginTs()),
			zap.Uint64("EndTs", dit.EndTs()),
D
dragondriver 已提交
1897 1898 1899 1900 1901
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

1902
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1903
			metrics.TotalLabel).Inc()
1904
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1905
			metrics.FailLabel).Inc()
1906

B
BossZou 已提交
1907
		return &commonpb.Status{
1908
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
BossZou 已提交
1909 1910 1911
			Reason:    err.Error(),
		}, nil
	}
D
dragondriver 已提交
1912 1913 1914 1915

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
1916
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
1917 1918 1919 1920 1921 1922 1923 1924
		zap.Int64("MsgID", dit.ID()),
		zap.Uint64("BeginTs", dit.BeginTs()),
		zap.Uint64("EndTs", dit.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

1925
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1926
		metrics.TotalLabel).Inc()
1927
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1928 1929
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
B
BossZou 已提交
1930 1931 1932
	return dit.result, nil
}

1933 1934
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
C
Cai Yudong 已提交
1935
func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
1936 1937 1938 1939 1940
	if !node.checkHealthy() {
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: unhealthyStatus(),
		}, nil
	}
1941 1942 1943 1944 1945

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-GetIndexBuildProgress")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

1946
	gibpt := &getIndexBuildProgressTask{
1947 1948 1949
		ctx:                          ctx,
		Condition:                    NewTaskCondition(ctx),
		GetIndexBuildProgressRequest: request,
1950 1951
		indexCoord:                   node.indexCoord,
		rootCoord:                    node.rootCoord,
1952
		dataCoord:                    node.dataCoord,
1953 1954
	}

1955
	method := "GetIndexBuildProgress"
1956
	tr := timerecord.NewTimeRecorder(method)
1957 1958 1959 1960

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
1961
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
1962 1963 1964 1965
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1966 1967 1968 1969 1970 1971

	if err := node.sched.ddQueue.Enqueue(gibpt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
1972
			zap.String("role", typeutil.ProxyRole),
1973 1974 1975 1976
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
1977
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
1978
			metrics.AbandonLabel).Inc()
1979

1980 1981 1982 1983 1984 1985 1986 1987
		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

1988 1989 1990
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
1991
		zap.String("role", typeutil.ProxyRole),
1992 1993 1994
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
1995 1996 1997 1998
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
1999 2000 2001 2002

	if err := gibpt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
2003
			zap.Error(err),
2004
			zap.String("traceID", traceID),
2005
			zap.String("role", typeutil.ProxyRole),
2006 2007 2008
			zap.Int64("MsgID", gibpt.ID()),
			zap.Uint64("BeginTs", gibpt.BeginTs()),
			zap.Uint64("EndTs", gibpt.EndTs()),
2009 2010 2011 2012
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))
2013
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2014
			metrics.TotalLabel).Inc()
2015
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2016
			metrics.FailLabel).Inc()
2017 2018 2019 2020 2021 2022 2023 2024

		return &milvuspb.GetIndexBuildProgressResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2025 2026 2027 2028

	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2029
		zap.String("role", typeutil.ProxyRole),
2030 2031 2032 2033 2034 2035 2036 2037
		zap.Int64("MsgID", gibpt.ID()),
		zap.Uint64("BeginTs", gibpt.BeginTs()),
		zap.Uint64("EndTs", gibpt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName),
		zap.Any("result", gibpt.result))
2038

2039
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2040
		metrics.TotalLabel).Inc()
2041
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2042 2043
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2044
	return gibpt.result, nil
2045 2046
}

2047
// GetIndexState get the build-state of index.
C
Cai Yudong 已提交
2048
func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
2049 2050 2051 2052 2053
	if !node.checkHealthy() {
		return &milvuspb.GetIndexStateResponse{
			Status: unhealthyStatus(),
		}, nil
	}
2054 2055 2056 2057 2058

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2059
	dipt := &getIndexStateTask{
G
godchen 已提交
2060 2061 2062
		ctx:                  ctx,
		Condition:            NewTaskCondition(ctx),
		GetIndexStateRequest: request,
2063 2064
		indexCoord:           node.indexCoord,
		rootCoord:            node.rootCoord,
2065 2066
	}

2067
	method := "GetIndexState"
2068
	tr := timerecord.NewTimeRecorder(method)
2069 2070 2071 2072

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2073
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2074 2075 2076 2077
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2078 2079 2080 2081 2082 2083

	if err := node.sched.ddQueue.Enqueue(dipt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2084
			zap.String("role", typeutil.ProxyRole),
2085 2086 2087 2088 2089
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

2090
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2091
			metrics.AbandonLabel).Inc()
2092

G
godchen 已提交
2093
		return &milvuspb.GetIndexStateResponse{
2094
			Status: &commonpb.Status{
2095
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2096 2097 2098 2099 2100
				Reason:    err.Error(),
			},
		}, nil
	}

2101 2102 2103
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2104
		zap.String("role", typeutil.ProxyRole),
2105 2106 2107
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2108 2109 2110 2111
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))
2112 2113 2114 2115

	if err := dipt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2116
			zap.Error(err),
2117
			zap.String("traceID", traceID),
2118
			zap.String("role", typeutil.ProxyRole),
2119 2120 2121
			zap.Int64("MsgID", dipt.ID()),
			zap.Uint64("BeginTs", dipt.BeginTs()),
			zap.Uint64("EndTs", dipt.EndTs()),
D
dragondriver 已提交
2122 2123 2124 2125 2126
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.String("field", request.FieldName),
			zap.String("index name", request.IndexName))

2127
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2128
			metrics.TotalLabel).Inc()
2129
		metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2130
			metrics.FailLabel).Inc()
2131

G
godchen 已提交
2132
		return &milvuspb.GetIndexStateResponse{
2133
			Status: &commonpb.Status{
2134
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2135 2136 2137 2138 2139
				Reason:    err.Error(),
			},
		}, nil
	}

2140 2141 2142
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2143
		zap.String("role", typeutil.ProxyRole),
2144 2145 2146 2147 2148 2149 2150 2151
		zap.Int64("MsgID", dipt.ID()),
		zap.Uint64("BeginTs", dipt.BeginTs()),
		zap.Uint64("EndTs", dipt.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("field", request.FieldName),
		zap.String("index name", request.IndexName))

2152
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2153
		metrics.TotalLabel).Inc()
2154
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2155 2156
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2157 2158 2159
	return dipt.result, nil
}

2160
// Insert insert records into collection.
C
Cai Yudong 已提交
2161
func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.MutationResult, error) {
X
Xiangyu Wang 已提交
2162 2163 2164 2165 2166 2167
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
	log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))

2168 2169 2170 2171 2172
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}
2173 2174
	method := "Insert"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2175

2176
	it := &insertTask{
2177 2178
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
X
xige-16 已提交
2179
		// req:       request,
2180 2181 2182 2183
		BaseInsertTask: BaseInsertTask{
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2184
			InsertRequest: internalpb.InsertRequest{
2185
				Base: &commonpb.MsgBase{
X
xige-16 已提交
2186 2187 2188
					MsgType:  commonpb.MsgType_Insert,
					MsgID:    0,
					SourceID: Params.ProxyCfg.ProxyID,
2189 2190 2191
				},
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
X
xige-16 已提交
2192 2193 2194
				FieldsData:     request.FieldsData,
				NumRows:        uint64(request.NumRows),
				Version:        internalpb.InsertDataVersion_ColumnBased,
2195
				// RowData: transfer column based request to this
2196 2197
			},
		},
2198
		rowIDAllocator: node.idAllocator,
2199
		segIDAssigner:  node.segAssigner,
2200
		chMgr:          node.chMgr,
2201
		chTicker:       node.chTicker,
2202
	}
2203 2204

	if len(it.PartitionName) <= 0 {
2205
		it.PartitionName = Params.CommonCfg.DefaultPartitionName
2206 2207
	}

X
Xiangyu Wang 已提交
2208
	constructFailedResponse := func(err error) *milvuspb.MutationResult {
X
xige-16 已提交
2209
		numRows := request.NumRows
2210 2211 2212 2213
		errIndex := make([]uint32, numRows)
		for i := uint32(0); i < numRows; i++ {
			errIndex[i] = i
		}
2214

X
Xiangyu Wang 已提交
2215 2216 2217 2218 2219 2220 2221
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			ErrIndex: errIndex,
		}
2222 2223
	}

X
Xiangyu Wang 已提交
2224
	log.Debug("Enqueue insert request in Proxy",
2225
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2226 2227 2228 2229 2230
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.Int("len(FieldsData)", len(request.FieldsData)),
		zap.Int("len(HashKeys)", len(request.HashKeys)),
2231 2232
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))
D
dragondriver 已提交
2233

X
Xiangyu Wang 已提交
2234 2235
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		log.Debug("Failed to enqueue insert task: " + err.Error())
2236
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), metrics.AbandonLabel).Inc()
X
Xiangyu Wang 已提交
2237
		return constructFailedResponse(err), nil
2238
	}
D
dragondriver 已提交
2239

X
Xiangyu Wang 已提交
2240
	log.Debug("Detail of insert request in Proxy",
2241
		zap.String("role", typeutil.ProxyRole),
X
Xiangyu Wang 已提交
2242
		zap.Int64("msgID", it.Base.MsgID),
D
dragondriver 已提交
2243 2244 2245 2246 2247
		zap.Uint64("BeginTS", it.BeginTs()),
		zap.Uint64("EndTS", it.EndTs()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
X
Xiangyu Wang 已提交
2248 2249 2250 2251 2252
		zap.Uint32("NumRows", request.NumRows),
		zap.String("traceID", traceID))

	if err := it.WaitToFinish(); err != nil {
		log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2253
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2254
			metrics.TotalLabel).Inc()
2255
		metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2256
			metrics.FailLabel).Inc()
X
Xiangyu Wang 已提交
2257 2258 2259 2260 2261
		return constructFailedResponse(err), nil
	}

	if it.result.Status.ErrorCode != commonpb.ErrorCode_Success {
		setErrorIndex := func() {
X
xige-16 已提交
2262
			numRows := request.NumRows
X
Xiangyu Wang 已提交
2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273
			errIndex := make([]uint32, numRows)
			for i := uint32(0); i < numRows; i++ {
				errIndex[i] = i
			}
			it.result.ErrIndex = errIndex
		}

		setErrorIndex()
	}

	// InsertCnt always equals to the number of entities in the request
X
xige-16 已提交
2274
	it.result.InsertCnt = int64(request.NumRows)
D
dragondriver 已提交
2275

2276
	metrics.ProxyInsertCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2277 2278 2279
		metrics.SuccessLabel).Inc()
	metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Add(float64(it.result.InsertCnt))
	metrics.ProxyInsertLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
2280 2281 2282
	return it.result, nil
}

2283
// Delete delete records from collection, then these records cannot be searched.
G
groot 已提交
2284
func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
2285 2286 2287
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2288 2289
	log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
	defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
2290

G
groot 已提交
2291 2292 2293 2294 2295 2296
	if !node.checkHealthy() {
		return &milvuspb.MutationResult{
			Status: unhealthyStatus(),
		}, nil
	}

2297 2298 2299
	method := "Delete"
	tr := timerecord.NewTimeRecorder(method)

2300
	dt := &deleteTask{
X
xige-16 已提交
2301 2302 2303
		ctx:        ctx,
		Condition:  NewTaskCondition(ctx),
		deleteExpr: request.Expr,
G
godchen 已提交
2304
		BaseDeleteTask: BaseDeleteTask{
G
godchen 已提交
2305 2306 2307
			BaseMsg: msgstream.BaseMsg{
				HashValues: request.HashKeys,
			},
G
godchen 已提交
2308 2309 2310 2311 2312
			DeleteRequest: internalpb.DeleteRequest{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_Delete,
					MsgID:   0,
				},
X
xige-16 已提交
2313
				DbName:         request.DbName,
G
godchen 已提交
2314 2315 2316
				CollectionName: request.CollectionName,
				PartitionName:  request.PartitionName,
				// RowData: transfer column based request to this
C
Cai Yudong 已提交
2317 2318 2319 2320
			},
		},
		chMgr:    node.chMgr,
		chTicker: node.chTicker,
G
groot 已提交
2321 2322
	}

2323
	log.Debug("Enqueue delete request in Proxy",
2324
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2325 2326 2327 2328
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
		zap.String("expr", request.Expr))
2329 2330 2331 2332

	// MsgID will be set by Enqueue()
	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
		log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID))
2333
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2334
			metrics.FailLabel).Inc()
2335

G
groot 已提交
2336 2337 2338 2339 2340 2341 2342 2343
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2344
	log.Debug("Detail of delete request in Proxy",
2345
		zap.String("role", typeutil.ProxyRole),
G
groot 已提交
2346 2347 2348 2349 2350
		zap.Int64("msgID", dt.Base.MsgID),
		zap.Uint64("timestamp", dt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.String("partition", request.PartitionName),
2351 2352
		zap.String("expr", request.Expr),
		zap.String("traceID", traceID))
G
groot 已提交
2353

2354 2355
	if err := dt.WaitToFinish(); err != nil {
		log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
2356
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2357
			metrics.TotalLabel).Inc()
2358
		metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2359
			metrics.FailLabel).Inc()
G
groot 已提交
2360 2361 2362 2363 2364 2365 2366 2367
		return &milvuspb.MutationResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

2368
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2369
		metrics.TotalLabel).Inc()
2370
	metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
2371 2372
		metrics.SuccessLabel).Inc()
	metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
G
groot 已提交
2373 2374 2375
	return dt.result, nil
}

2376
// Search search the most similar records of requests.
C
Cai Yudong 已提交
2377
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
2378 2379 2380 2381 2382
	if !node.checkHealthy() {
		return &milvuspb.SearchResults{
			Status: unhealthyStatus(),
		}, nil
	}
2383 2384
	method := "Search"
	tr := timerecord.NewTimeRecorder(method)
D
dragondriver 已提交
2385

C
cai.zhang 已提交
2386 2387
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search")
	defer sp.Finish()
D
dragondriver 已提交
2388 2389
	traceID, _, _ := trace.InfoFromSpan(sp)

2390
	qt := &searchTask{
S
sunby 已提交
2391
		ctx:       ctx,
2392
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
2393
		SearchRequest: &internalpb.SearchRequest{
2394
			Base: &commonpb.MsgBase{
2395
				MsgType:  commonpb.MsgType_Search,
2396
				SourceID: Params.ProxyCfg.ProxyID,
2397
			},
2398
			ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2399
		},
2400
		resultBuf: make(chan []*internalpb.SearchResults, 1),
2401 2402
		query:     request,
		chMgr:     node.chMgr,
2403
		qc:        node.queryCoord,
2404
		tr:        timerecord.NewTimeRecorder("search"),
2405 2406
	}

2407 2408 2409 2410 2411
	travelTs := request.TravelTimestamp
	guaranteeTs := request.GuaranteeTimestamp

	log.Debug(
		rpcReceived(method),
D
dragondriver 已提交
2412
		zap.String("traceID", traceID),
2413
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2414 2415 2416 2417 2418
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2419 2420 2421 2422
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2423

2424 2425 2426
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
D
dragondriver 已提交
2427 2428
			zap.Error(err),
			zap.String("traceID", traceID),
2429
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2430 2431 2432 2433 2434 2435
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
			zap.Any("OutputFields", request.OutputFields),
2436 2437 2438
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2439

2440
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2441
			metrics.SearchLabel, metrics.AbandonLabel).Inc()
2442

2443 2444
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2445
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2446 2447 2448 2449 2450
				Reason:    err.Error(),
			},
		}, nil
	}

2451 2452
	log.Debug(
		rpcEnqueued(method),
D
dragondriver 已提交
2453
		zap.String("traceID", traceID),
2454
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2455
		zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2456 2457 2458 2459 2460
		zap.Uint64("timestamp", qt.Base.Timestamp),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
2461
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2462 2463 2464 2465
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2466

2467 2468 2469
	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2470
			zap.Error(err),
D
dragondriver 已提交
2471
			zap.String("traceID", traceID),
2472
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2473
			zap.Int64("msgID", qt.ID()),
D
dragondriver 已提交
2474 2475 2476 2477
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames),
			zap.Any("dsl", request.Dsl),
2478
			zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2479 2480 2481 2482
			zap.Any("OutputFields", request.OutputFields),
			zap.Any("search_params", request.SearchParams),
			zap.Uint64("travel_timestamp", travelTs),
			zap.Uint64("guarantee_timestamp", guaranteeTs))
2483
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2484
			metrics.SearchLabel, metrics.TotalLabel).Inc()
2485

2486 2487
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
			metrics.SearchLabel, metrics.FailLabel).Inc()
2488

2489 2490
		return &milvuspb.SearchResults{
			Status: &commonpb.Status{
2491
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
2492 2493 2494 2495 2496
				Reason:    err.Error(),
			},
		}, nil
	}

2497 2498
	log.Debug(
		rpcDone(method),
D
dragondriver 已提交
2499
		zap.String("traceID", traceID),
2500
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2501 2502 2503 2504 2505 2506
		zap.Int64("msgID", qt.ID()),
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames),
		zap.Any("dsl", request.Dsl),
		zap.Any("len(PlaceholderGroup)", len(request.PlaceholderGroup)),
2507 2508 2509 2510
		zap.Any("OutputFields", request.OutputFields),
		zap.Any("search_params", request.SearchParams),
		zap.Uint64("travel_timestamp", travelTs),
		zap.Uint64("guarantee_timestamp", guaranteeTs))
D
dragondriver 已提交
2511

2512
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2513
		metrics.SearchLabel, metrics.TotalLabel).Inc()
2514
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2515
		metrics.SearchLabel, metrics.SuccessLabel).Inc()
2516
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2517
		metrics.SearchLabel).Set(float64(qt.result.Results.NumQueries))
C
cai.zhang 已提交
2518
	searchDur := tr.ElapseSpan().Milliseconds()
2519
	metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2520 2521
		metrics.SearchLabel).Observe(float64(searchDur))
	metrics.ProxySearchLatencyPerNQ.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(searchDur) / float64(qt.result.Results.NumQueries))
2522 2523 2524
	return qt.result, nil
}

2525
// Flush notify data nodes to persist the data of collection.
2526 2527 2528 2529 2530 2531 2532
func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*milvuspb.FlushResponse, error) {
	resp := &milvuspb.FlushResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    "",
		},
	}
2533
	if !node.checkHealthy() {
2534 2535
		resp.Status.Reason = "proxy is not healthy"
		return resp, nil
2536
	}
D
dragondriver 已提交
2537 2538 2539 2540 2541

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Flush")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

2542
	ft := &flushTask{
T
ThreadDao 已提交
2543 2544 2545
		ctx:          ctx,
		Condition:    NewTaskCondition(ctx),
		FlushRequest: request,
2546
		dataCoord:    node.dataCoord,
2547 2548
	}

D
dragondriver 已提交
2549
	method := "Flush"
2550 2551
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2552 2553 2554 2555

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2556
		zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
2557 2558
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2559 2560 2561 2562 2563 2564

	if err := node.sched.ddQueue.Enqueue(ft); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2565
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2566 2567 2568
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

2569 2570
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

2571 2572
		resp.Status.Reason = err.Error()
		return resp, nil
2573 2574
	}

D
dragondriver 已提交
2575 2576 2577
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2578
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2579 2580 2581
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2582 2583
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))
D
dragondriver 已提交
2584 2585 2586 2587

	if err := ft.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
D
dragondriver 已提交
2588
			zap.Error(err),
D
dragondriver 已提交
2589
			zap.String("traceID", traceID),
2590
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2591 2592 2593
			zap.Int64("MsgID", ft.ID()),
			zap.Uint64("BeginTs", ft.BeginTs()),
			zap.Uint64("EndTs", ft.EndTs()),
D
dragondriver 已提交
2594 2595 2596
			zap.String("db", request.DbName),
			zap.Any("collections", request.CollectionNames))

2597 2598
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

D
dragondriver 已提交
2599
		resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
2600 2601
		resp.Status.Reason = err.Error()
		return resp, nil
2602 2603
	}

D
dragondriver 已提交
2604 2605 2606
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
2607
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2608 2609 2610 2611 2612 2613
		zap.Int64("MsgID", ft.ID()),
		zap.Uint64("BeginTs", ft.BeginTs()),
		zap.Uint64("EndTs", ft.EndTs()),
		zap.String("db", request.DbName),
		zap.Any("collections", request.CollectionNames))

2614 2615
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
2616
	return ft.result, nil
2617 2618
}

2619
// Query get the records by primary keys.
C
Cai Yudong 已提交
2620
func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*milvuspb.QueryResults, error) {
2621 2622 2623 2624 2625
	if !node.checkHealthy() {
		return &milvuspb.QueryResults{
			Status: unhealthyStatus(),
		}, nil
	}
2626

D
dragondriver 已提交
2627 2628 2629
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Query")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)
2630
	tr := timerecord.NewTimeRecorder("Query")
D
dragondriver 已提交
2631

2632
	qt := &queryTask{
2633 2634 2635 2636 2637
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		RetrieveRequest: &internalpb.RetrieveRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_Retrieve,
2638
				SourceID: Params.ProxyCfg.ProxyID,
2639
			},
2640
			ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2641 2642
		},
		resultBuf: make(chan []*internalpb.RetrieveResults),
2643
		query:     request,
2644 2645
		chMgr:     node.chMgr,
		qc:        node.queryCoord,
2646 2647
	}

D
dragondriver 已提交
2648 2649 2650 2651 2652
	method := "Query"

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
2653
		zap.String("role", typeutil.ProxyRole),
2654 2655 2656
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
G
godchen 已提交
2657

D
dragondriver 已提交
2658 2659 2660 2661 2662 2663
	if err := node.sched.dqQueue.Enqueue(qt); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
2664 2665 2666
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2667

2668
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2669
			metrics.QueryLabel, metrics.FailLabel).Inc()
2670 2671 2672 2673 2674 2675
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
2676 2677
	}

D
dragondriver 已提交
2678 2679 2680
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2681
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2682 2683 2684
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2685 2686 2687
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2688 2689 2690 2691 2692 2693

	if err := qt.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
			zap.Error(err),
			zap.String("traceID", traceID),
2694
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2695 2696 2697
			zap.Int64("MsgID", qt.ID()),
			zap.Uint64("BeginTs", qt.BeginTs()),
			zap.Uint64("EndTs", qt.EndTs()),
2698 2699 2700
			zap.String("db", request.DbName),
			zap.String("collection", request.CollectionName),
			zap.Any("partitions", request.PartitionNames))
2701

2702
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2703
			metrics.QueryLabel, metrics.TotalLabel).Inc()
2704
		metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2705
			metrics.QueryLabel, metrics.FailLabel).Inc()
2706

2707 2708 2709 2710 2711 2712 2713
		return &milvuspb.QueryResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
2714

D
dragondriver 已提交
2715 2716 2717 2718 2719 2720 2721
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", qt.ID()),
		zap.Uint64("BeginTs", qt.BeginTs()),
		zap.Uint64("EndTs", qt.EndTs()),
2722 2723 2724
		zap.String("db", request.DbName),
		zap.String("collection", request.CollectionName),
		zap.Any("partitions", request.PartitionNames))
D
dragondriver 已提交
2725

2726
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2727
		metrics.QueryLabel, metrics.TotalLabel).Inc()
2728
	metrics.ProxySearchCount.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2729
		metrics.QueryLabel, metrics.SuccessLabel).Inc()
2730
	metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2731
		metrics.QueryLabel).Set(float64(len(qt.result.FieldsData)))
2732
	metrics.ProxySendMessageLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
2733
		metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
2734 2735 2736 2737 2738
	return &milvuspb.QueryResults{
		Status:     qt.result.Status,
		FieldsData: qt.result.FieldsData,
	}, nil
}
2739

2740
// CreateAlias create alias for collection, then you can search the collection with alias.
Y
Yusup 已提交
2741 2742 2743 2744
func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2745 2746 2747 2748 2749

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CreateAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2750 2751 2752 2753 2754 2755 2756
	cat := &CreateAliasTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		CreateAliasRequest: request,
		rootCoord:          node.rootCoord,
	}

D
dragondriver 已提交
2757
	method := "CreateAlias"
2758 2759
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(cat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

2779 2780
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()

Y
Yusup 已提交
2781 2782 2783 2784 2785 2786
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2787 2788 2789
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2790
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2791 2792 2793 2794
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2795 2796
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2797 2798 2799 2800

	if err := cat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2801
			zap.Error(err),
D
dragondriver 已提交
2802
			zap.String("traceID", traceID),
2803
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2804 2805 2806 2807
			zap.Int64("MsgID", cat.ID()),
			zap.Uint64("BeginTs", cat.BeginTs()),
			zap.Uint64("EndTs", cat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2808 2809
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
2810
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()
Y
Yusup 已提交
2811 2812 2813 2814 2815 2816 2817

		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", cat.ID()),
		zap.Uint64("BeginTs", cat.BeginTs()),
		zap.Uint64("EndTs", cat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

2829 2830
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2831 2832 2833
	return cat.result, nil
}

2834
// DropAlias alter the alias of collection.
Y
Yusup 已提交
2835 2836 2837 2838
func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2839 2840 2841 2842 2843

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-DropAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2844 2845 2846 2847 2848 2849 2850
	dat := &DropAliasTask{
		ctx:              ctx,
		Condition:        NewTaskCondition(ctx),
		DropAliasRequest: request,
		rootCoord:        node.rootCoord,
	}

D
dragondriver 已提交
2851
	method := "DropAlias"
2852 2853
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

	if err := node.sched.ddQueue.Enqueue(dat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias))
2870
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2871

Y
Yusup 已提交
2872 2873 2874 2875 2876 2877
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2878 2879 2880
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2881
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2882 2883 2884 2885
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2886
		zap.String("alias", request.Alias))
D
dragondriver 已提交
2887 2888 2889 2890

	if err := dat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2891
			zap.Error(err),
D
dragondriver 已提交
2892
			zap.String("traceID", traceID),
2893
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2894 2895 2896 2897
			zap.Int64("MsgID", dat.ID()),
			zap.Uint64("BeginTs", dat.BeginTs()),
			zap.Uint64("EndTs", dat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2898 2899
			zap.String("alias", request.Alias))

2900 2901
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

Y
Yusup 已提交
2902 2903 2904 2905 2906 2907
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2908 2909 2910 2911 2912 2913 2914 2915 2916 2917
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", dat.ID()),
		zap.Uint64("BeginTs", dat.BeginTs()),
		zap.Uint64("EndTs", dat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias))

2918 2919
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
2920 2921 2922
	return dat.result, nil
}

2923
// AlterAlias alter alias of collection.
Y
Yusup 已提交
2924 2925 2926 2927
func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}
D
dragondriver 已提交
2928 2929 2930 2931 2932

	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-AlterAlias")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

Y
Yusup 已提交
2933 2934 2935 2936 2937 2938 2939
	aat := &AlterAliasTask{
		ctx:               ctx,
		Condition:         NewTaskCondition(ctx),
		AlterAliasRequest: request,
		rootCoord:         node.rootCoord,
	}

D
dragondriver 已提交
2940
	method := "AlterAlias"
2941 2942
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.TotalLabel).Inc()
D
dragondriver 已提交
2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960

	log.Debug(
		rpcReceived(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

	if err := node.sched.ddQueue.Enqueue(aat); err != nil {
		log.Warn(
			rpcFailedToEnqueue(method),
			zap.Error(err),
			zap.String("traceID", traceID),
			zap.String("role", typeutil.ProxyRole),
			zap.String("db", request.DbName),
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))
2961
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.AbandonLabel).Inc()
D
dragondriver 已提交
2962

Y
Yusup 已提交
2963 2964 2965 2966 2967 2968
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
2969 2970 2971
	log.Debug(
		rpcEnqueued(method),
		zap.String("traceID", traceID),
2972
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2973 2974 2975 2976
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
Y
Yusup 已提交
2977 2978
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))
D
dragondriver 已提交
2979 2980 2981 2982

	if err := aat.WaitToFinish(); err != nil {
		log.Warn(
			rpcFailedToWaitToFinish(method),
Y
Yusup 已提交
2983
			zap.Error(err),
D
dragondriver 已提交
2984
			zap.String("traceID", traceID),
2985
			zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
2986 2987 2988 2989
			zap.Int64("MsgID", aat.ID()),
			zap.Uint64("BeginTs", aat.BeginTs()),
			zap.Uint64("EndTs", aat.EndTs()),
			zap.String("db", request.DbName),
Y
Yusup 已提交
2990 2991 2992
			zap.String("alias", request.Alias),
			zap.String("collection", request.CollectionName))

2993 2994
		metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.FailLabel).Inc()

Y
Yusup 已提交
2995 2996 2997 2998 2999 3000
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

D
dragondriver 已提交
3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011
	log.Debug(
		rpcDone(method),
		zap.String("traceID", traceID),
		zap.String("role", typeutil.ProxyRole),
		zap.Int64("MsgID", aat.ID()),
		zap.Uint64("BeginTs", aat.BeginTs()),
		zap.Uint64("EndTs", aat.EndTs()),
		zap.String("db", request.DbName),
		zap.String("alias", request.Alias),
		zap.String("collection", request.CollectionName))

3012 3013
	metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method, metrics.SuccessLabel).Inc()
	metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
Y
Yusup 已提交
3014 3015 3016
	return aat.result, nil
}

3017
// CalcDistance calculates the distances between vectors.
3018
func (node *Proxy) CalcDistance(ctx context.Context, request *milvuspb.CalcDistanceRequest) (*milvuspb.CalcDistanceResults, error) {
3019 3020 3021 3022 3023
	if !node.checkHealthy() {
		return &milvuspb.CalcDistanceResults{
			Status: unhealthyStatus(),
		}, nil
	}
3024
	param, _ := funcutil.GetAttrByKeyFromRepeatedKV("metric", request.GetParams())
3025 3026 3027 3028 3029 3030 3031 3032
	metric, err := distance.ValidateMetricType(param)
	if err != nil {
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
3033 3034
	}

3035 3036 3037 3038
	sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-CalcDistance")
	defer sp.Finish()
	traceID, _, _ := trace.InfoFromSpan(sp)

3039 3040
	query := func(ids *milvuspb.VectorIDs) (*milvuspb.QueryResults, error) {
		outputFields := []string{ids.FieldName}
3041

3042 3043 3044 3045 3046
		queryRequest := &milvuspb.QueryRequest{
			DbName:         "",
			CollectionName: ids.CollectionName,
			PartitionNames: ids.PartitionNames,
			OutputFields:   outputFields,
3047 3048
		}

3049
		qt := &queryTask{
3050 3051 3052 3053 3054
			ctx:       ctx,
			Condition: NewTaskCondition(ctx),
			RetrieveRequest: &internalpb.RetrieveRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Retrieve,
3055
					SourceID: Params.ProxyCfg.ProxyID,
3056
				},
3057
				ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
3058
			},
3059
			resultBuf: make(chan []*internalpb.RetrieveResults),
3060
			query:     queryRequest,
3061
			chMgr:     node.chMgr,
3062
			qc:        node.queryCoord,
Y
yukun 已提交
3063
			ids:       ids.IdArray,
3064 3065
		}

3066
		err := node.sched.dqQueue.Enqueue(qt)
3067
		if err != nil {
3068 3069 3070
			log.Debug("CalcDistance queryTask failed to enqueue",
				zap.Error(err),
				zap.String("traceID", traceID),
3071
				zap.String("role", typeutil.ProxyRole),
3072 3073 3074 3075
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames))

3076 3077 3078 3079 3080
			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3081
			}, err
3082
		}
3083 3084 3085

		log.Debug("CalcDistance queryTask enqueued",
			zap.String("traceID", traceID),
3086
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3087 3088 3089 3090 3091 3092
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3093 3094 3095 3096

		err = qt.WaitToFinish()
		if err != nil {
			log.Debug("CalcDistance queryTask failed to WaitToFinish",
G
godchen 已提交
3097
				zap.Error(err),
3098
				zap.String("traceID", traceID),
3099
				zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3100 3101 3102 3103 3104 3105
				zap.Int64("msgID", qt.Base.MsgID),
				zap.Uint64("timestamp", qt.Base.Timestamp),
				zap.String("db", queryRequest.DbName),
				zap.String("collection", queryRequest.CollectionName),
				zap.Any("partitions", queryRequest.PartitionNames),
				zap.Any("OutputFields", queryRequest.OutputFields))
3106 3107 3108 3109 3110 3111

			return &milvuspb.QueryResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
3112
			}, err
3113
		}
3114 3115 3116

		log.Debug("CalcDistance queryTask Done",
			zap.String("traceID", traceID),
3117
			zap.String("role", typeutil.ProxyRole),
G
godchen 已提交
3118 3119 3120 3121 3122 3123
			zap.Int64("msgID", qt.Base.MsgID),
			zap.Uint64("timestamp", qt.Base.Timestamp),
			zap.String("db", queryRequest.DbName),
			zap.String("collection", queryRequest.CollectionName),
			zap.Any("partitions", queryRequest.PartitionNames),
			zap.Any("OutputFields", queryRequest.OutputFields))
3124 3125

		return &milvuspb.QueryResults{
3126 3127
			Status:     qt.result.Status,
			FieldsData: qt.result.FieldsData,
3128 3129 3130
		}, nil
	}

3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144
	// the vectors retrieved are random order, we need re-arrange the vectors by the order of input ids
	arrangeFunc := func(ids *milvuspb.VectorIDs, retrievedFields []*schemapb.FieldData) (*schemapb.VectorField, error) {
		var retrievedIds *schemapb.ScalarField
		var retrievedVectors *schemapb.VectorField
		for _, fieldData := range retrievedFields {
			if fieldData.FieldName == ids.FieldName {
				retrievedVectors = fieldData.GetVectors()
			}
			if fieldData.Type == schemapb.DataType_Int64 {
				retrievedIds = fieldData.GetScalars()
			}
		}

		if retrievedIds == nil || retrievedVectors == nil {
3145
			return nil, errors.New("failed to fetch vectors")
3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
		}

		dict := make(map[int64]int)
		for index, id := range retrievedIds.GetLongData().Data {
			dict[id] = index
		}

		inputIds := ids.IdArray.GetIntId().Data
		if retrievedVectors.GetFloatVector() != nil {
			floatArr := retrievedVectors.GetFloatVector().Data
			element := retrievedVectors.GetDim()
			result := make([]float32, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3162
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188
				}
				result = append(result, floatArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element,
				Data: &schemapb.VectorField_FloatVector{
					FloatVector: &schemapb.FloatArray{
						Data: result,
					},
				},
			}, nil
		}

		if retrievedVectors.GetBinaryVector() != nil {
			binaryArr := retrievedVectors.GetBinaryVector()
			element := retrievedVectors.GetDim()
			if element%8 != 0 {
				element = element + 8 - element%8
			}

			result := make([]byte, 0, int64(len(inputIds))*element)
			for _, id := range inputIds {
				index, ok := dict[id]
				if !ok {
					log.Error("id not found in CalcDistance", zap.Int64("id", id))
3189
					return nil, errors.New("failed to fetch vectors by id: " + fmt.Sprintln(id))
3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201
				}
				result = append(result, binaryArr[int64(index)*element:int64(index+1)*element]...)
			}

			return &schemapb.VectorField{
				Dim: element * 8,
				Data: &schemapb.VectorField_BinaryVector{
					BinaryVector: result,
				},
			}, nil
		}

3202
		return nil, errors.New("failed to fetch vectors")
3203 3204
	}

3205 3206
	log.Debug("CalcDistance received",
		zap.String("traceID", traceID),
3207
		zap.String("role", typeutil.ProxyRole),
3208
		zap.String("metric", metric))
G
godchen 已提交
3209

3210 3211 3212
	vectorsLeft := request.GetOpLeft().GetDataArray()
	opLeft := request.GetOpLeft().GetIdArray()
	if opLeft != nil {
3213 3214
		log.Debug("OpLeft IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3215
			zap.String("role", typeutil.ProxyRole))
3216

3217
		result, err := query(opLeft)
3218
		if err != nil {
3219 3220 3221
			log.Debug("Failed to get left vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3222
				zap.String("role", typeutil.ProxyRole))
3223

3224 3225 3226 3227 3228 3229 3230 3231
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3232 3233
		log.Debug("OpLeft IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3234
			zap.String("role", typeutil.ProxyRole))
3235

3236 3237
		vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData)
		if err != nil {
3238 3239 3240
			log.Debug("Failed to re-arrange left vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3241
				zap.String("role", typeutil.ProxyRole))
3242

3243 3244 3245 3246 3247 3248
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3249
		}
3250 3251 3252

		log.Debug("Re-arrange left vectors done",
			zap.String("traceID", traceID),
3253
			zap.String("role", typeutil.ProxyRole))
3254 3255
	}

G
groot 已提交
3256
	if vectorsLeft == nil {
3257 3258 3259
		msg := "Left vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3260
			zap.String("role", typeutil.ProxyRole))
3261

G
groot 已提交
3262 3263 3264
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3265
				Reason:    msg,
G
groot 已提交
3266 3267 3268 3269
			},
		}, nil
	}

3270 3271 3272
	vectorsRight := request.GetOpRight().GetDataArray()
	opRight := request.GetOpRight().GetIdArray()
	if opRight != nil {
3273 3274
		log.Debug("OpRight IdArray not empty, Get vectors by id",
			zap.String("traceID", traceID),
3275
			zap.String("role", typeutil.ProxyRole))
3276

3277
		result, err := query(opRight)
3278
		if err != nil {
3279 3280 3281
			log.Debug("Failed to get right vectors by id",
				zap.Error(err),
				zap.String("traceID", traceID),
3282
				zap.String("role", typeutil.ProxyRole))
3283

3284 3285 3286 3287 3288 3289 3290 3291
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3292 3293
		log.Debug("OpRight IdArray not empty, Get vectors by id done",
			zap.String("traceID", traceID),
3294
			zap.String("role", typeutil.ProxyRole))
3295

3296 3297
		vectorsRight, err = arrangeFunc(opRight, result.FieldsData)
		if err != nil {
3298 3299 3300
			log.Debug("Failed to re-arrange right vectors",
				zap.Error(err),
				zap.String("traceID", traceID),
3301
				zap.String("role", typeutil.ProxyRole))
3302

3303 3304 3305 3306 3307 3308
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
3309
		}
3310 3311 3312

		log.Debug("Re-arrange right vectors done",
			zap.String("traceID", traceID),
3313
			zap.String("role", typeutil.ProxyRole))
3314 3315
	}

G
groot 已提交
3316
	if vectorsRight == nil {
3317 3318 3319
		msg := "Right vectors array is empty"
		log.Debug(msg,
			zap.String("traceID", traceID),
3320
			zap.String("role", typeutil.ProxyRole))
3321

G
groot 已提交
3322 3323 3324
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3325
				Reason:    msg,
G
groot 已提交
3326 3327 3328 3329
			},
		}, nil
	}

3330
	if vectorsLeft.Dim != vectorsRight.Dim {
3331 3332 3333
		msg := "Vectors dimension is not equal"
		log.Debug(msg,
			zap.String("traceID", traceID),
3334
			zap.String("role", typeutil.ProxyRole))
3335

3336 3337 3338
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3339
				Reason:    msg,
3340 3341 3342 3343 3344
			},
		}, nil
	}

	if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil {
3345 3346
		distances, err := distance.CalcFloatDistance(vectorsLeft.Dim, vectorsLeft.GetFloatVector().Data, vectorsRight.GetFloatVector().Data, metric)
		if err != nil {
3347 3348 3349
			log.Debug("Failed to CalcFloatDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3350
				zap.String("role", typeutil.ProxyRole))
3351

3352 3353 3354 3355 3356 3357 3358 3359
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

3360 3361 3362
		log.Debug("CalcFloatDistance done",
			zap.Error(err),
			zap.String("traceID", traceID),
3363
			zap.String("role", typeutil.ProxyRole))
3364

3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
		return &milvuspb.CalcDistanceResults{
			Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
			Array: &milvuspb.CalcDistanceResults_FloatDist{
				FloatDist: &schemapb.FloatArray{
					Data: distances,
				},
			},
		}, nil
	}

3375
	if vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetBinaryVector() != nil {
G
groot 已提交
3376
		hamming, err := distance.CalcHammingDistance(vectorsLeft.Dim, vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector())
3377
		if err != nil {
3378 3379 3380
			log.Debug("Failed to CalcHammingDistance",
				zap.Error(err),
				zap.String("traceID", traceID),
3381
				zap.String("role", typeutil.ProxyRole))
3382

3383 3384 3385 3386 3387 3388 3389 3390 3391
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, nil
		}

		if metric == distance.HAMMING {
3392 3393
			log.Debug("CalcHammingDistance done",
				zap.String("traceID", traceID),
3394
				zap.String("role", typeutil.ProxyRole))
3395

3396 3397 3398 3399
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_IntDist{
					IntDist: &schemapb.IntArray{
G
groot 已提交
3400
						Data: hamming,
3401 3402 3403 3404 3405 3406
					},
				},
			}, nil
		}

		if metric == distance.TANIMOTO {
G
groot 已提交
3407
			tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.Dim, hamming)
3408
			if err != nil {
3409 3410 3411
				log.Debug("Failed to CalcTanimotoCoefficient",
					zap.Error(err),
					zap.String("traceID", traceID),
3412
					zap.String("role", typeutil.ProxyRole))
3413

3414 3415 3416 3417 3418 3419 3420 3421
				return &milvuspb.CalcDistanceResults{
					Status: &commonpb.Status{
						ErrorCode: commonpb.ErrorCode_UnexpectedError,
						Reason:    err.Error(),
					},
				}, nil
			}

3422 3423
			log.Debug("CalcTanimotoCoefficient done",
				zap.String("traceID", traceID),
3424
				zap.String("role", typeutil.ProxyRole))
3425

3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436
			return &milvuspb.CalcDistanceResults{
				Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""},
				Array: &milvuspb.CalcDistanceResults_FloatDist{
					FloatDist: &schemapb.FloatArray{
						Data: tanimoto,
					},
				},
			}, nil
		}
	}

3437
	err = errors.New("unexpected error")
3438
	if (vectorsLeft.GetBinaryVector() != nil && vectorsRight.GetFloatVector() != nil) || (vectorsLeft.GetFloatVector() != nil && vectorsRight.GetBinaryVector() != nil) {
3439
		err = errors.New("cannot calculate distance between binary vectors and float vectors")
3440 3441
	}

3442 3443 3444
	log.Debug("Failed to CalcDistance",
		zap.Error(err),
		zap.String("traceID", traceID),
3445
		zap.String("role", typeutil.ProxyRole))
3446

3447 3448 3449 3450 3451 3452 3453 3454
	return &milvuspb.CalcDistanceResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		},
	}, nil
}

3455
// GetDdChannel returns the used channel for dd operations.
C
Cai Yudong 已提交
3456
func (node *Proxy) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
3457 3458
	panic("implement me")
}
X
XuanYang-cn 已提交
3459

3460
// GetPersistentSegmentInfo get the information of sealed segment.
C
Cai Yudong 已提交
3461
func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
D
dragondriver 已提交
3462
	log.Debug("GetPersistentSegmentInfo",
3463
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3464 3465 3466
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3467
	resp := &milvuspb.GetPersistentSegmentInfoResponse{
X
XuanYang-cn 已提交
3468
		Status: &commonpb.Status{
3469
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
3470 3471
		},
	}
3472 3473 3474 3475
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
3476 3477 3478
	method := "GetPersistentSegmentInfo"
	tr := timerecord.NewTimeRecorder(method)
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
3479
		metrics.TotalLabel).Inc()
G
godchen 已提交
3480
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
X
XuanYang-cn 已提交
3481
	if err != nil {
3482
		resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error()
X
XuanYang-cn 已提交
3483 3484
		return resp, nil
	}
3485
	infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
X
XuanYang-cn 已提交
3486
		Base: &commonpb.MsgBase{
3487
			MsgType:   commonpb.MsgType_SegmentInfo,
X
XuanYang-cn 已提交
3488 3489
			MsgID:     0,
			Timestamp: 0,
3490
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3491 3492 3493 3494
		},
		SegmentIDs: segments,
	})
	if err != nil {
3495
		log.Debug("GetPersistentSegmentInfo fail", zap.Error(err))
3496
		resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error()
X
XuanYang-cn 已提交
3497 3498
		return resp, nil
	}
3499
	log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status))
3500
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3501 3502 3503 3504 3505 3506
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	persistentInfos := make([]*milvuspb.PersistentSegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		persistentInfos[i] = &milvuspb.PersistentSegmentInfo{
S
sunby 已提交
3507
			SegmentID:    info.ID,
X
XuanYang-cn 已提交
3508 3509
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
S
sunby 已提交
3510
			NumRows:      info.NumOfRows,
X
XuanYang-cn 已提交
3511 3512 3513
			State:        info.State,
		}
	}
3514
	metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method,
3515 3516
		metrics.SuccessLabel).Inc()
	metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
3517
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
X
XuanYang-cn 已提交
3518 3519 3520 3521
	resp.Infos = persistentInfos
	return resp, nil
}

J
jingkl 已提交
3522
// GetQuerySegmentInfo gets segment information from QueryCoord.
C
Cai Yudong 已提交
3523
func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
D
dragondriver 已提交
3524
	log.Debug("GetQuerySegmentInfo",
3525
		zap.String("role", typeutil.ProxyRole),
D
dragondriver 已提交
3526 3527 3528
		zap.String("db", req.DbName),
		zap.Any("collection", req.CollectionName))

G
godchen 已提交
3529
	resp := &milvuspb.GetQuerySegmentInfoResponse{
Z
zhenshan.cao 已提交
3530
		Status: &commonpb.Status{
3531
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
Z
zhenshan.cao 已提交
3532 3533
		},
	}
3534 3535 3536 3537
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}
G
godchen 已提交
3538
	segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName)
Z
zhenshan.cao 已提交
3539 3540 3541 3542
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3543 3544 3545 3546 3547
	collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3548
	infoResp, err := node.queryCoord.GetSegmentInfo(ctx, &querypb.GetSegmentInfoRequest{
Z
zhenshan.cao 已提交
3549
		Base: &commonpb.MsgBase{
3550
			MsgType:   commonpb.MsgType_SegmentInfo,
Z
zhenshan.cao 已提交
3551 3552
			MsgID:     0,
			Timestamp: 0,
3553
			SourceID:  Params.ProxyCfg.ProxyID,
Z
zhenshan.cao 已提交
3554
		},
3555 3556
		CollectionID: collID,
		SegmentIDs:   segments,
Z
zhenshan.cao 已提交
3557 3558
	})
	if err != nil {
3559
		log.Error("Failed to get segment info from QueryCoord",
3560
			zap.Int64s("segmentIDs", segments), zap.Error(err))
Z
zhenshan.cao 已提交
3561 3562 3563
		resp.Status.Reason = err.Error()
		return resp, nil
	}
3564
	log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status))
3565
	if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success {
3566
		log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason))
Z
zhenshan.cao 已提交
3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579
		resp.Status.Reason = infoResp.Status.Reason
		return resp, nil
	}
	queryInfos := make([]*milvuspb.QuerySegmentInfo, len(infoResp.Infos))
	for i, info := range infoResp.Infos {
		queryInfos[i] = &milvuspb.QuerySegmentInfo{
			SegmentID:    info.SegmentID,
			CollectionID: info.CollectionID,
			PartitionID:  info.PartitionID,
			NumRows:      info.NumRows,
			MemSize:      info.MemSize,
			IndexName:    info.IndexName,
			IndexID:      info.IndexID,
3580
			NodeID:       info.NodeID,
X
xige-16 已提交
3581
			State:        info.SegmentState,
Z
zhenshan.cao 已提交
3582 3583
		}
	}
3584
	resp.Status.ErrorCode = commonpb.ErrorCode_Success
Z
zhenshan.cao 已提交
3585 3586 3587 3588
	resp.Infos = queryInfos
	return resp, nil
}

C
Cai Yudong 已提交
3589
func (node *Proxy) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) {
3590
	describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
X
XuanYang-cn 已提交
3591
		Base: &commonpb.MsgBase{
3592
			MsgType:   commonpb.MsgType_DescribeCollection,
X
XuanYang-cn 已提交
3593 3594
			MsgID:     0,
			Timestamp: 0,
3595
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3596 3597 3598 3599 3600 3601 3602
		},
		DbName:         dbName,
		CollectionName: collectionName,
	})
	if err != nil {
		return nil, err
	}
3603
	if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3604 3605 3606
		return nil, errors.New(describeCollectionResponse.Status.Reason)
	}
	collectionID := describeCollectionResponse.CollectionID
3607
	showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
X
XuanYang-cn 已提交
3608
		Base: &commonpb.MsgBase{
3609
			MsgType:   commonpb.MsgType_ShowPartitions,
X
XuanYang-cn 已提交
3610 3611
			MsgID:     0,
			Timestamp: 0,
3612
			SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3613 3614 3615 3616 3617 3618 3619 3620
		},
		DbName:         dbName,
		CollectionName: collectionName,
		CollectionID:   collectionID,
	})
	if err != nil {
		return nil, err
	}
3621
	if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3622 3623 3624 3625 3626
		return nil, errors.New(showPartitionsResp.Status.Reason)
	}

	ret := make([]UniqueID, 0)
	for _, partitionID := range showPartitionsResp.PartitionIDs {
3627
		showSegmentResponse, err := node.rootCoord.ShowSegments(ctx, &milvuspb.ShowSegmentsRequest{
X
XuanYang-cn 已提交
3628
			Base: &commonpb.MsgBase{
3629
				MsgType:   commonpb.MsgType_ShowSegments,
X
XuanYang-cn 已提交
3630 3631
				MsgID:     0,
				Timestamp: 0,
3632
				SourceID:  Params.ProxyCfg.ProxyID,
X
XuanYang-cn 已提交
3633 3634 3635 3636 3637 3638 3639
			},
			CollectionID: collectionID,
			PartitionID:  partitionID,
		})
		if err != nil {
			return nil, err
		}
3640
		if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
X
XuanYang-cn 已提交
3641 3642 3643 3644 3645 3646
			return nil, errors.New(showSegmentResponse.Status.Reason)
		}
		ret = append(ret, showSegmentResponse.SegmentIDs...)
	}
	return ret, nil
}
3647

J
jingkl 已提交
3648
// Dummy handles dummy request
C
Cai Yudong 已提交
3649
func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660
	failedResponse := &milvuspb.DummyResponse{
		Response: `{"status": "fail"}`,
	}

	// TODO(wxyu): change name RequestType to Request
	drt, err := parseDummyRequestType(req.RequestType)
	if err != nil {
		log.Debug("Failed to parse dummy request type")
		return failedResponse, nil
	}

3661 3662
	if drt.RequestType == "query" {
		drr, err := parseDummyQueryRequest(req.RequestType)
3663
		if err != nil {
3664
			log.Debug("Failed to parse dummy query request")
3665 3666 3667
			return failedResponse, nil
		}

3668
		request := &milvuspb.QueryRequest{
3669 3670 3671
			DbName:         drr.DbName,
			CollectionName: drr.CollectionName,
			PartitionNames: drr.PartitionNames,
3672
			OutputFields:   drr.OutputFields,
X
Xiangyu Wang 已提交
3673 3674
		}

3675
		_, err = node.Query(ctx, request)
3676
		if err != nil {
3677
			log.Debug("Failed to execute dummy query")
3678 3679
			return failedResponse, err
		}
X
Xiangyu Wang 已提交
3680 3681 3682 3683 3684 3685

		return &milvuspb.DummyResponse{
			Response: `{"status": "success"}`,
		}, nil
	}

3686 3687
	log.Debug("cannot find specify dummy request type")
	return failedResponse, nil
X
Xiangyu Wang 已提交
3688 3689
}

J
jingkl 已提交
3690
// RegisterLink registers a link
C
Cai Yudong 已提交
3691
func (node *Proxy) RegisterLink(ctx context.Context, req *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
G
godchen 已提交
3692
	code := node.stateCode.Load().(internalpb.StateCode)
D
dragondriver 已提交
3693
	log.Debug("RegisterLink",
3694
		zap.String("role", typeutil.ProxyRole),
C
Cai Yudong 已提交
3695
		zap.Any("state code of proxy", code))
D
dragondriver 已提交
3696

G
godchen 已提交
3697
	if code != internalpb.StateCode_Healthy {
3698 3699 3700
		return &milvuspb.RegisterLinkResponse{
			Address: nil,
			Status: &commonpb.Status{
3701
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3702
				Reason:    "proxy not healthy",
3703 3704 3705
			},
		}, nil
	}
3706
	//metrics.ProxyLinkedSDKs.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Inc()
3707 3708 3709
	return &milvuspb.RegisterLinkResponse{
		Address: nil,
		Status: &commonpb.Status{
3710
			ErrorCode: commonpb.ErrorCode_Success,
3711
			Reason:    os.Getenv(metricsinfo.DeployModeEnvKey),
3712 3713 3714
		},
	}, nil
}
3715

3716 3717 3718
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("Proxy.GetMetrics",
3719
		zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3720 3721 3722 3723
		zap.String("req", req.Request))

	if !node.checkHealthy() {
		log.Warn("Proxy.GetMetrics failed",
3724
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3725
			zap.String("req", req.Request),
3726
			zap.Error(errProxyIsUnhealthy(Params.ProxyCfg.ProxyID)))
3727 3728 3729 3730

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
3731
				Reason:    msgProxyIsUnhealthy(Params.ProxyCfg.ProxyID),
3732 3733 3734 3735 3736 3737 3738 3739
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to parse metric type",
3740
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	log.Debug("Proxy.GetMetrics",
		zap.String("metric_type", metricType))

D
dragondriver 已提交
3756 3757 3758 3759 3760 3761 3762 3763 3764 3765
	msgID := UniqueID(0)
	msgID, err = node.idAllocator.AllocOne()
	if err != nil {
		log.Warn("Proxy.GetMetrics failed to allocate id",
			zap.Error(err))
	}
	req.Base = &commonpb.MsgBase{
		MsgType:   commonpb.MsgType_SystemInfo,
		MsgID:     msgID,
		Timestamp: 0,
3766
		SourceID:  Params.ProxyCfg.ProxyID,
D
dragondriver 已提交
3767 3768
	}

3769
	if metricType == metricsinfo.SystemInfoMetrics {
3770 3771 3772 3773 3774 3775 3776
		ret, err := node.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

3777
		metrics, err := getSystemInfoMetrics(ctx, req, node)
3778 3779

		log.Debug("Proxy.GetMetrics",
3780
			zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3781 3782 3783 3784 3785
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

3786 3787
		node.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
3788
		return metrics, nil
3789 3790 3791
	}

	log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
3792
		zap.Int64("node_id", Params.ProxyCfg.ProxyID),
3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}

B
bigsheeper 已提交
3805 3806 3807
// LoadBalance would do a load balancing operation between query nodes
func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("Proxy.LoadBalance",
3808
		zap.Int64("proxy_id", Params.ProxyCfg.ProxyID),
B
bigsheeper 已提交
3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822
		zap.Any("req", req))

	if !node.checkHealthy() {
		return unhealthyStatus(), nil
	}

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	}
	infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_LoadBalanceSegments,
			MsgID:     0,
			Timestamp: 0,
3823
			SourceID:  Params.ProxyCfg.ProxyID,
B
bigsheeper 已提交
3824 3825 3826
		},
		SourceNodeIDs:    []int64{req.SrcNodeID},
		DstNodeIDs:       req.DstNodeIDs,
X
xige-16 已提交
3827
		BalanceReason:    querypb.TriggerCondition_GrpcRequest,
B
bigsheeper 已提交
3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845
		SealedSegmentIDs: req.SealedSegmentIDs,
	})
	if err != nil {
		log.Error("Failed to LoadBalance from Query Coordinator",
			zap.Any("req", req), zap.Error(err))
		status.Reason = err.Error()
		return status, nil
	}
	if infoResp.ErrorCode != commonpb.ErrorCode_Success {
		log.Error("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason))
		status.Reason = infoResp.Reason
		return status, nil
	}
	log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp))
	status.ErrorCode = commonpb.ErrorCode_Success
	return status, nil
}

J
jingkl 已提交
3846
//GetCompactionState gets the compaction state of multiple segments
3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
	log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionState(ctx, req)
	log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

3860
// ManualCompaction invokes compaction on specified collection
3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886
func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
	log.Info("received ManualCompaction request", zap.Int64("collectionID", req.GetCollectionID()))
	resp := &milvuspb.ManualCompactionResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.ManualCompaction(ctx, req)
	log.Info("received ManualCompaction response", zap.Int64("collectionID", req.GetCollectionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
	resp := &milvuspb.GetCompactionPlansResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
	log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

B
Bingyi Sun 已提交
3887 3888 3889
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	log.Info("received get flush state request", zap.Any("request", req))
3890
	var err error
B
Bingyi Sun 已提交
3891 3892 3893 3894 3895 3896 3897
	resp := &milvuspb.GetFlushStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		log.Info("unable to get flush state because of closed server")
		return resp, nil
	}

3898
	resp, err = node.dataCoord.GetFlushState(ctx, req)
X
Xiaofan 已提交
3899 3900 3901 3902
	if err != nil {
		log.Info("failed to get flush state response", zap.Error(err))
		return nil, err
	}
B
Bingyi Sun 已提交
3903 3904 3905 3906
	log.Info("received get flush state response", zap.Any("response", resp))
	return resp, err
}

C
Cai Yudong 已提交
3907 3908
// checkHealthy checks proxy state is Healthy
func (node *Proxy) checkHealthy() bool {
3909 3910 3911 3912
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

J
jingkl 已提交
3913
//unhealthyStatus returns the proxy not healthy status
3914 3915 3916
func unhealthyStatus() *commonpb.Status {
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_UnexpectedError,
C
Cai Yudong 已提交
3917
		Reason:    "proxy not healthy",
3918 3919
	}
}
G
groot 已提交
3920 3921 3922

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
G
groot 已提交
3923
	log.Info("received import request")
G
groot 已提交
3924 3925 3926 3927 3928 3929
	resp := &milvuspb.ImportResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

G
groot 已提交
3930 3931 3932
	resp, err := node.rootCoord.Import(ctx, req)
	log.Info("received import response", zap.String("collectionName", req.GetCollectionName()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
G
groot 已提交
3933 3934
}

X
XuanYang-cn 已提交
3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
	log.Info("received get replicas request")
	resp := &milvuspb.GetReplicasResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

	resp, err := node.queryCoord.GetReplicas(ctx, req)
	log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
	return resp, err
}

G
groot 已提交
3949 3950
// Check import task state from datanode
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
G
groot 已提交
3951
	log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
G
groot 已提交
3952 3953 3954 3955 3956 3957
	resp := &milvuspb.GetImportStateResponse{}
	if !node.checkHealthy() {
		resp.Status = unhealthyStatus()
		return resp, nil
	}

G
groot 已提交
3958 3959 3960
	resp, err := node.rootCoord.GetImportState(ctx, req)
	log.Info("received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
	return resp, err
G
groot 已提交
3961
}
3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145

// InvalidateCredentialCache invalidate the credential cache of specified username.
func (node *Proxy) InvalidateCredentialCache(ctx context.Context, request *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	username := request.Username
	if globalMetaCache != nil {
		globalMetaCache.RemoveCredential(username) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to invalidate credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

// UpdateCredentialCache update the credential cache of specified username.
func (node *Proxy) UpdateCredentialCache(ctx context.Context, request *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	credInfo := &internalpb.CredentialInfo{
		Username:          request.Username,
		EncryptedPassword: request.Password,
	}
	if globalMetaCache != nil {
		globalMetaCache.UpdateCredential(credInfo) // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to update credential cache",
		zap.String("role", typeutil.ProxyRole),
		zap.String("username", request.Username))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) ClearCredUsersCache(ctx context.Context, request *internalpb.ClearCredUsersCacheRequest) (*commonpb.Status, error) {
	ctx = logutil.WithModule(ctx, moduleName)
	logutil.Logger(ctx).Debug("received request to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	if globalMetaCache != nil {
		globalMetaCache.ClearCredUsers() // no need to return error, though credential may be not cached
	}
	logutil.Logger(ctx).Debug("complete to clear credential usernames cache",
		zap.String("role", typeutil.ProxyRole))

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
		Reason:    "",
	}, nil
}

func (node *Proxy) CreateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
	log.Debug("CreateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
	// validate params
	username := req.Username
	if err := ValidateUsername(username); err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "decode password fail key:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_CreateCredentialFailure,
			Reason:    "encrypt password fail key:" + req.Username,
		}, nil
	}
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
	result, err := node.rootCoord.CreateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
		log.Error("create credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) UpdateCredential(ctx context.Context, req *milvuspb.CreateCredentialRequest) (*commonpb.Status, error) {
	log.Debug("UpdateCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
	// validate params
	rawPassword, err := crypto.Base64Decode(req.Password)
	if err != nil {
		log.Error("decode password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "decode password fail when updating:" + req.Username,
		}, nil
	}
	if err = ValidatePassword(rawPassword); err != nil {
		log.Error("illegal password", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_IllegalArgument,
			Reason:    err.Error(),
		}, nil
	}
	encryptedPassword, err := crypto.PasswordEncrypt(rawPassword)
	if err != nil {
		log.Error("encrypt password fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UpdateCredentialFailure,
			Reason:    "encrypt password fail when updating:" + req.Username,
		}, nil
	}
	credInfo := &internalpb.CredentialInfo{
		Username:          req.Username,
		EncryptedPassword: encryptedPassword,
	}
	result, err := node.rootCoord.UpdateCredential(ctx, credInfo)
	if err != nil { // for error like conntext timeout etc.
		log.Error("update credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
	log.Debug("DeleteCredential", zap.String("role", typeutil.RootCoordRole), zap.String("username", req.Username))
	result, err := node.rootCoord.DeleteCredential(ctx, req)
	if err != nil { // for error like conntext timeout etc.
		log.Error("delete credential fail", zap.String("username", req.Username), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}
	return result, err
}

func (node *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
	log.Debug("ListCredUsers", zap.String("role", typeutil.RootCoordRole))
	// get from cache
	usernames, err := globalMetaCache.GetCredUsernames(ctx)
	if err != nil {
		return &milvuspb.ListCredUsersResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	return &milvuspb.ListCredUsersResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Usernames: usernames,
	}, nil
}